Source code for arborist.cli

"""Arborist CLI: ingest / search / verify / stats."""

from __future__ import annotations

import argparse
import json
import os
import sys
from pathlib import Path

from arborist import __version__
from arborist.ingest import ingest_source, verify_random_sample
from arborist.progress import Progress
from arborist.search import FTS5Backend
from arborist.sources import WikipediaCurDump
from arborist.store import (
    DEFAULT_DB_PATH,
    append_audit,
    connect,
    connect_query,
    connect_readonly,
    stats,
    transaction,
)


def _cmd_ingest(args: argparse.Namespace) -> int:
    """Ingest a corpus (Wikipedia, HTML, git, etc.) into a shard."""
    if args.source in ("wikipedia_cur", "wikipedia_old"):
        if not args.path:
            print(f"--path is required for {args.source}", file=sys.stderr)
            return 2
        from arborist.sources import WikipediaSqlDump

        table = "cur" if args.source == "wikipedia_cur" else "old"
        shard = None
        if args.shard:
            rank_str, total_str = args.shard.split("/", 1)
            shard = (int(rank_str), int(total_str))
        src = WikipediaSqlDump(path=args.path, table=table, shard=shard)
    elif args.source == "html":  # noqa: SIM114 — keep branch shape
        try:
            from arborist.sources import HtmlPageSource
        except ImportError:
            print(
                "html source requires extras: pip install 'arborist[html]'",
                file=sys.stderr,
            )
            return 2
        urls: list[str] = list(args.url or [])
        if args.urls_from:
            urls.extend(
                line.strip()
                for line in Path(args.urls_from).read_text(encoding="utf-8").splitlines()
                if line.strip() and not line.lstrip().startswith("#")
            )
        if not urls:
            print("html source needs --url or --urls-from", file=sys.stderr)
            return 2
        src = HtmlPageSource(
            urls,
            respect_robots=not args.no_robots,
            default_author=getattr(args, "author", None),
        )
    elif args.source in ("grok_export", "grok_media"):
        if not args.path:
            print(f"--path is required for {args.source}", file=sys.stderr)
            return 2
        from arborist.sources import GrokExportSource, GrokMediaPostsSource

        cls = GrokExportSource if args.source == "grok_export" else GrokMediaPostsSource
        src = cls(path=args.path)
    elif args.source == "textbook_tex":
        # PG-style LaTeX source ingest (#000031). Each --url or --bundle
        # points at a TeX URL (e.g., PG eBook /files/N/N-t/N-t.tex).
        # `--urls-from FILE` for a list. The strip-tex pipeline produces
        # plain prose suitable for the standard 512-token chunker.
        urls: list[str] = list(getattr(args, "url", None) or [])
        if getattr(args, "bundle", None):
            urls.extend(args.bundle)
        if args.urls_from:
            urls.extend(
                line.strip()
                for line in Path(args.urls_from).read_text(encoding="utf-8").splitlines()
                if line.strip() and not line.lstrip().startswith("#")
            )
        if not urls:
            print(
                "textbook_tex source needs --url, --bundle, or --urls-from",
                file=sys.stderr,
            )
            return 2
        from arborist.sources import TextbookTexSource

        src = TextbookTexSource(
            urls,
            default_author=getattr(args, "author", None),
        )
    elif args.source == "claim_pack":
        # Companion JSON bundles (axiom + theorem packs) — see ticket
        # #000029. --bundle is repeatable so axiom-bundle and theorem-bundle
        # parse together, allowing cross-bundle pillar_reference edges to
        # resolve to specific record URIs. A single --path also works for
        # one-bundle ingest.
        paths: list[str] = []
        if getattr(args, "bundle", None):
            paths.extend(args.bundle)
        if args.path:
            paths.append(args.path)
        if not paths:
            print(
                "claim_pack needs --bundle (repeatable) or --path",
                file=sys.stderr,
            )
            return 2
        from arborist.sources import ClaimPackSource

        src = ClaimPackSource(paths)
    elif args.source in ("wikipedia_xml", "wikipedia_xml_history", "wikipedia_abstract"):
        if not args.path:
            print(f"--path is required for {args.source}", file=sys.stderr)
            return 2
        from arborist.sources import WikipediaAbstractDump, WikipediaXmlDump

        if args.source == "wikipedia_abstract":
            src = WikipediaAbstractDump(path=args.path)
        else:
            shard = None
            if args.shard:
                rank_str, total_str = args.shard.split("/", 1)
                shard = (int(rank_str), int(total_str))
            src = WikipediaXmlDump(
                path=args.path,
                shard=shard,
                multi_revision=(args.source == "wikipedia_xml_history"),
            )
    elif args.source in ("git_repo", "hg_repo"):
        if not args.path:
            print(f"--path is required for {args.source}", file=sys.stderr)
            return 2
        from arborist.sources import GitRepoSource, MercurialRepoSource

        cls = GitRepoSource if args.source == "git_repo" else MercurialRepoSource
        src = cls(repo_path=args.path)
    elif args.source == "providence":
        # Self-reference: promote STRICT live providence_cache records
        # past the kindergarten window into the document corpus.
        # See docs/self-reference-design.md.
        #
        # NOTE: ``connect`` lives at module scope (line 19). Re-importing
        # it inside this branch makes Python's compiler treat ``connect``
        # as a function-local for the entire ``_cmd_ingest`` body, which
        # breaks the module-level binding used at line 177 for *every*
        # non-providence source. Don't add a local re-import here.
        from arborist.sources.providence import (
            DEFAULT_KINDERGARTEN_SECONDS,
            ProvidenceSource,
        )

        # The source reads from the SAME shard it's writing into —
        # promote each shard's own STRICT records to its own
        # documents table. Cross-shard promotion runs as a separate
        # invocation per shard.
        target_db_for_read = args.db
        if args.shards_dir and args.shard:
            rank_str, total_str = args.shard.split("/", 1)
            rank = int(rank_str)
            total = int(total_str)
            digits = max(3, len(str(total - 1)))
            target_db_for_read = Path(args.shards_dir) / f"{rank:0{digits}d}.db"
        if not target_db_for_read:
            print("--db or --shards-dir + --shard required for providence source", file=sys.stderr)
            return 2
        kg_seconds = int(getattr(args, "kindergarten_seconds", None) or DEFAULT_KINDERGARTEN_SECONDS)
        # Open a separate connection for reading; ingest opens its own
        # write connection downstream.
        read_conn = connect(target_db_for_read)
        src = ProvidenceSource(read_conn, kindergarten_seconds=kg_seconds)
    else:
        print(f"unknown source: {args.source}", file=sys.stderr)
        return 2

    # Resolve target DB: if --shards-dir is set with --shard, write to a
    # per-shard file. Each shard owns its own SQLite file, so N parallel
    # ingests have ZERO writer-lock contention.
    target_db = args.db
    if args.shards_dir:
        if not args.shard:
            print(
                "--shards-dir requires --shard rank/total",
                file=sys.stderr,
            )
            return 2
        rank_str, total_str = args.shard.split("/", 1)
        rank = int(rank_str)
        total = int(total_str)
        shards_dir = Path(args.shards_dir)
        shards_dir.mkdir(parents=True, exist_ok=True)
        digits = max(3, len(str(total - 1)))
        target_db = shards_dir / f"{rank:0{digits}d}.db"

    progress: Progress | None = None
    if not args.quiet:
        prefix = ""
        if args.shard:
            prefix = f"[shard {args.shard}] "
        progress = Progress(
            interval=args.progress_interval,
            total_estimate=args.total_estimate,
            prefix=prefix,
        )

    conn = connect(target_db)
    embedded = None
    try:
        result = ingest_source(
            conn,
            src,
            chunker_name=args.chunker,
            limit=args.limit,
            batch_size=args.batch_size,
            resume=args.resume,
            progress=progress,
            loss_report_enabled=not args.no_loss_report,
            loss_report_excerpts=not args.no_loss_excerpts,
            loss_report_max_excerpt_bytes=args.loss_excerpt_bytes,
        )
        if getattr(args, "embed", False):
            # Eager opt-in: after the chunk+Merkle-commit pass, embed the
            # chunks this run added (incremental — only chunk_ids not
            # already in chunk_vecs). Default ingest does NOT embed; the
            # lazy out-of-band pass (`arborist embed`, or a cron, or the
            # Prometheus-Σ unconscious sweep) is the usual path. See #000039.
            from arborist.search.vec import embed_documents
            embedded = embed_documents(conn, incremental=True)
    finally:
        conn.close()
    out = dict(result.__dict__)
    if embedded is not None:
        out["chunks_embedded"] = embedded
    print(json.dumps(out, indent=2, ensure_ascii=False))
    return 0


def _cmd_search(args: argparse.Namespace) -> int:
    """Lexical (FTS5) or semantic (vec) query against chunks; hits as JSON/text."""
    backend_name = getattr(args, "backend", "fts5")
    conn = (
        connect_query(args.db, shards_dir=args.global_shards_dir)
        if args.global_shards_dir
        else connect(args.db)
    )
    try:
        if backend_name == "vec":
            from arborist.search import VecBackend  # type: ignore
            backend = VecBackend(conn)
            if not backend.populated():
                print(
                    "chunk_vecs is empty or missing on this DB — run "
                    "`arborist --db <db> embed` first.",
                    file=sys.stderr,
                )
                return 1
        else:
            backend = FTS5Backend(conn)
        hits = backend.search(args.query, limit=args.limit)
    finally:
        conn.close()
    if args.json:
        print(
            json.dumps(
                [
                    {
                        "document_root": h.document_root,
                        "document_uri": h.document_uri,
                        "chunk_idx": h.chunk_idx,
                        "snippet": h.snippet,
                        "score": h.score,
                        "audit_mode": h.audit_mode.value,
                        "title": h.title,
                    }
                    for h in hits
                ],
                indent=2, ensure_ascii=False
            )
        )
    else:
        for h in hits:
            print(f"[{h.audit_mode.value}] {h.score:7.3f}  {h.title or h.document_uri}")
            print(f"    chunk {h.chunk_idx}  root={h.document_root[:16]}…")
            print(f"    {h.snippet}")
            print()
    return 0


def _cmd_embed(args: argparse.Namespace) -> int:
    """Populate chunk_vecs (semantic embeddings) for the --db shard.

    Writes a sibling vec0 virtual table; does not touch chunks /
    documents / the audit chain. First run loads (and downloads) the
    fastembed bge-small-en-v1.5 ONNX model (~130 MB).
    """
    import time as _time

    from arborist.search.vec import embed_documents, vec_backend_version

    quant = getattr(args, "quant", "float32")
    rebuild = getattr(args, "rebuild", False)
    conn = connect(args.db)
    t0 = _time.monotonic()
    last_print = [0.0]

    def _progress(done: int, total: int) -> None:
        now = _time.monotonic()
        if now - last_print[0] >= 1.0 or done == total:
            pct = (100.0 * done / total) if total else 100.0
            rate = done / max(now - t0, 1e-6)
            print(
                f"  embedded {done}/{total} chunks ({pct:.1f}%, {rate:.0f}/s)",
                file=sys.stderr,
            )
            last_print[0] = now

    try:
        n = embed_documents(
            conn,
            limit=args.limit,
            batch_size=args.batch_size,
            incremental=not rebuild,
            rebuild=rebuild,
            quant=quant,
            progress=_progress,
        )
    except ValueError as e:
        print(f"embed error: {e}", file=sys.stderr)
        return 2
    finally:
        conn.close()
    elapsed = _time.monotonic() - t0
    print(json.dumps({
        "db": str(args.db),
        "backend_version": vec_backend_version(quant),
        "quant": quant,
        "mode": "rebuild" if rebuild else "incremental",
        "chunks_embedded": n,
        "elapsed_s": round(elapsed, 2),
        "rate_per_s": round(n / max(elapsed, 1e-6), 1),
    }, indent=2))
    return 0


def _cmd_verify(args: argparse.Namespace) -> int:
    """Round-trip Merkle proofs on N random documents (chunk 0 each)."""
    conn = (
        connect_query(args.db, shards_dir=args.global_shards_dir)
        if args.global_shards_dir
        else connect(args.db)
    )
    try:
        result = verify_random_sample(conn, n=args.n)
    finally:
        conn.close()
    print(json.dumps(result, indent=2, ensure_ascii=False))
    return 0 if result["failed"] == 0 else 1


def _cmd_distill(args: argparse.Namespace) -> int:
    """Distill existing documents into cores (surface→core, or core→core+1)."""
    from arborist.distill import get_distiller
    from arborist.distill.runner import distill_existing
    from arborist.store import discover_shards

    try:
        distiller = get_distiller(args.process)
    except ValueError as e:
        print(str(e), file=sys.stderr)
        return 2

    # Sharded mode: iterate over each shard's DB and distill in place.
    # Cores stay in their source shard so the per-shard audit/derivation
    # chains remain self-contained.
    if args.global_shards_dir:
        shard_paths = discover_shards(args.global_shards_dir)
        if not shard_paths:
            print(f"no shards in {args.global_shards_dir}", file=sys.stderr)
            return 2

        per_shard: list[dict] = []
        totals = {
            "scanned": 0,
            "distilled": 0,
            "skipped_existing": 0,
            "skipped_cold": 0,
            "skipped_empty": 0,
        }
        for sp in shard_paths:
            conn = connect(sp)
            try:
                r = distill_existing(
                    conn,
                    distiller,
                    kind=args.kind,
                    source_type=args.source_type,
                    limit=args.limit,
                    chunker_name=args.chunker,
                    batch_size=args.batch_size,
                )
            finally:
                conn.close()
            per_shard.append({"shard": sp.name, **r})
            for k in totals:
                totals[k] += r[k]
        print(json.dumps({**totals, "shards": per_shard}, indent=2, ensure_ascii=False))
        return 0

    conn = connect(args.db)
    try:
        result = distill_existing(
            conn,
            distiller,
            kind=args.kind,
            source_type=args.source_type,
            limit=args.limit,
            chunker_name=args.chunker,
            batch_size=args.batch_size,
        )
    finally:
        conn.close()
    print(json.dumps(result, indent=2, ensure_ascii=False))
    return 0


def _cmd_ask(args: argparse.Namespace) -> int:
    """Ask a question against one document; verifier classifies the answer."""
    import os

    from arborist.qa import ask
    from arborist.qa.client import OpenAICompatibleClient, StubClient

    base_url = args.endpoint or os.environ.get(
        "ARBORIST_LLM_ENDPOINT", "https://hermes.ai.unturf.com/v1"
    )
    model = args.model or os.environ.get(
        "ARBORIST_LLM_MODEL",
        "adamo1139/Hermes-3-Llama-3.1-8B-FP8-Dynamic",
    )
    revision = os.environ.get("ARBORIST_LLM_REVISION", "")
    quantization = os.environ.get("ARBORIST_LLM_QUANTIZATION", "fp8-dynamic")
    api_key = os.environ.get("ARBORIST_LLM_API_KEY")

    client: object
    if args.dry_run:
        client = StubClient(
            answer=f"[STUB] would have answered '{args.question}' against root {args.document_root[:16]}…"
        )
    else:
        client = OpenAICompatibleClient(base_url=base_url, api_key=api_key)

    conn = (
        connect_query(args.db, shards_dir=args.global_shards_dir)
        if args.global_shards_dir
        else connect(args.db)
    )
    # Per-call policy override for --answer-mode. Other knobs flow from
    # DEFAULT_POLICY.
    from arborist.qa.runner import DEFAULT_POLICY as _DEFAULT_ASK_POLICY
    call_policy = dict(_DEFAULT_ASK_POLICY)
    if getattr(args, "answer_mode", None):
        call_policy["answer_mode"] = args.answer_mode
    try:
        result = ask(
            conn,
            document_root=args.document_root,
            question=args.question,
            client=client,
            model_id=model,
            revision=revision,
            quantization=quantization,
            policy=call_policy,
        )
    finally:
        conn.close()
    print(json.dumps(result, indent=2, ensure_ascii=False))
    return 0 if result.get("status") in ("cache_hit", "cache_miss_then_written") else 1


def _cmd_query(args: argparse.Namespace) -> int:
    """Multi-source RAG: question -> top-K corpus docs -> Hermes -> cache."""
    import os

    from arborist.qa.client import OpenAICompatibleClient, StubClient
    from arborist.qa.progress import from_env as _progress_from_env
    from arborist.qa.query import query

    base_url = args.endpoint or os.environ.get(
        "ARBORIST_LLM_ENDPOINT", "https://hermes.ai.unturf.com/v1"
    )
    model = args.model or os.environ.get(
        "ARBORIST_LLM_MODEL",
        "adamo1139/Hermes-3-Llama-3.1-8B-FP8-Dynamic",
    )
    revision = os.environ.get("ARBORIST_LLM_REVISION", "")
    quantization = os.environ.get("ARBORIST_LLM_QUANTIZATION", "fp8-dynamic")
    api_key = os.environ.get("ARBORIST_LLM_API_KEY")

    client: object
    if args.dry_run:
        client = StubClient(
            answer="[STUB] dry-run: would have asked Hermes-3 with the assembled context."
        )
    else:
        client = OpenAICompatibleClient(base_url=base_url, api_key=api_key)

    qa_db = args.qa_db
    if qa_db is None:
        if args.global_shards_dir:
            qa_db = Path(args.global_shards_dir) / "qa.db"
        else:
            qa_db = Path.home() / ".arborist" / "qa.db"
    qa_db = Path(qa_db)

    shards_dir = (
        Path(args.global_shards_dir) if args.global_shards_dir else None
    )
    single_db = None if shards_dir else args.db

    # Apply per-call policy overrides (question_dedup, repair, answer_mode)
    # on top of the default. fidelity is a function-level kwarg, not in
    # the policy.
    from arborist.qa.query import DEFAULT_QUERY_POLICY
    call_policy = dict(DEFAULT_QUERY_POLICY)
    if getattr(args, "question_dedup", None):
        call_policy["question_dedup"] = args.question_dedup
    if getattr(args, "answer_mode", None):
        call_policy["answer_mode"] = args.answer_mode
    if getattr(args, "repair", False):
        # Mechanical-only repair when --repair is set; --repair-reprompts
        # adds the optional re-prompt tier on top. Both default off so
        # `make query` stays single-shot unless a knob is flipped.
        call_policy["repair_enabled"] = True
        call_policy["repair_max_reprompts"] = max(
            0, int(getattr(args, "repair_reprompts", 0))
        )
    # Ticket #000008 Phase 4 — quantifier-guard CLI overrides.
    # Six-level disable hierarchy at Levels 2 (per-call CLI flag)
    # via these flags; Level 3 policy fields are reachable via the
    # underlying policy dict.
    if getattr(args, "no_quantifier_guard", False):
        call_policy["quantifier_guard_enabled"] = False
    if getattr(args, "allow_broad", False):
        # Keeps the classifier on (telemetry stays useful) but
        # zeroes out the apply_caps gate so broad shapes don't
        # get clipped during emergent search.
        call_policy["quantifier_guard_apply_caps"] = False
    if getattr(args, "reject_broad", False):
        # Phase 4 reject-broad: the actual rejection happens inside
        # query() via the policy field; this CLI flag just sets the
        # field. See arborist/qa/query.py for the early-return path.
        call_policy["quantifier_reject_broad"] = True
    if getattr(args, "apply_quantifier_caps", False):
        # Operator opts in to flipping the dry-run gate per-call.
        # Bench-first per §10.11.3 — this flag is the path from
        # dry-run to live-cap.
        call_policy["quantifier_guard_apply_caps"] = True
    if getattr(args, "crosslang_guard", False):
        # Ticket #000001 §7 Phase 0 — opt in to the cross-language
        # guard per-call (default OFF). Pure policy-field set; the
        # behaviour lives in query() (crosslang.guard + the two seams).
        call_policy["crosslang_guard_enabled"] = True
    if getattr(args, "crosslang_translate", False):
        # Ticket #000056 — Operation Sandwich implies the Phase-0
        # guard (it rides the same non-English signal).
        call_policy["crosslang_guard_enabled"] = True
        call_policy["crosslang_translate_enabled"] = True
    # Ticket #000010 — meta-cognition CLI overrides.
    if getattr(args, "no_preflight", False):
        call_policy["metacognition_enabled"] = False
    if getattr(args, "block_on_contradiction", False):
        # Strict mode: hard-block on lexical contradictions instead
        # of label-only.
        call_policy["metacognition_block_on_contradiction"] = True
    if getattr(args, "soft_preflight", False):
        # Ticket #000011 — opt-in to model-assisted soft preflight
        # sidecar. Adds one short LLM round-trip; NEVER gates
        # admissibility (D1 preserved).
        call_policy["soft_preflight_enabled"] = True
    # Ticket #000028 — multi-modality witness override. CLI > policy.
    _witness_override = getattr(args, "witness_override", None)
    if _witness_override == "on":
        call_policy["canonical_witness_enabled"] = True
    elif _witness_override == "off":
        call_policy["canonical_witness_enabled"] = False
    if getattr(args, "no_canonical_preflight", False):
        # Disable the math/logic π* short-circuit per-call. Forces
        # RAG even on pure-arithmetic / pure-propositional input —
        # useful when bench-comparing against the LLM path.
        call_policy["canonical_projection_preflight"] = False

    progress = _progress_from_env(
        cli_override=getattr(args, "progress_override", None),
    )

    result = query(
        question=args.question,
        qa_db=qa_db,
        chat_client=client,
        model_id=model,
        revision=revision,
        quantization=quantization,
        shards_dir=shards_dir,
        single_db=single_db,
        top_k=args.top_k,
        over_fetch=args.over_fetch,
        max_context_chars=args.max_context_chars,
        policy=call_policy,
        fidelity=getattr(args, "fidelity", None),
        burn_existing=bool(getattr(args, "burn", False)),
        retrieval_keywords=getattr(args, "retrieval_keywords", None),
        progress=progress,
    )

    # Emit unfirehose-compatible session journal. One JSONL file per
    # `make query` invocation, written to ~/.arborist/unfirehose/{slug}/
    # {session_uuid}.jsonl. Unfirehose's native-harness watcher picks
    # this up automatically (no registration). Failures here must NEVER
    # break the query path — wrap in a broad except & swallow.
    try:
        _emit_query_journal(args.question, result, model)
    except Exception:  # pragma: no cover — best-effort journaling
        pass

    # Ticket #000031 Phase 3 — stash shards_dir on the result dict
    # so the render-layer warrant-chain tail can look up
    # warrant-resolver derivations without needing args. Stripped
    # before JSON output to keep the json shape stable.
    _shards_dir = args.global_shards_dir or args.shards_dir
    if _shards_dir:
        result["_shards_dir"] = str(_shards_dir)

    if args.json:
        # Don't leak the internal-only `_shards_dir` field into JSON
        # output — strip it before serialize.
        json_result = {k: v for k, v in result.items() if not k.startswith("_")}
        print(json.dumps(json_result, indent=2, ensure_ascii=False))
    else:
        print(_render_query_human(result, args.question))
    return (
        0
        if result.get("status") in (
            "cache_hit",
            "cache_miss_then_written",
            "canonical_projection",
        )
        else 1
    )


def _emit_query_journal(question: str, result: dict, model: str) -> None:
    """Write one unfirehose/1.0 session for this query invocation."""
    from arborist.journal import SessionWriter
    timings = result.get("timings") or {}
    answer = result.get("answer_text") or ""
    arborist_meta = {
        "audit_mode": result.get("audit_mode"),
        "verifier_method": result.get("verifier_method"),
        "n_quotes": result.get("n_quotes"),
        "n_verified": result.get("n_verified"),
        "cache_key": result.get("cache_key"),
        "cache_status": result.get("status"),
        "lookup_path": result.get("lookup_path"),
        "violations": [
            {"kind": v.get("kind"), "rationale": (v.get("rationale") or "")[:160]}
            for v in (result.get("violations") or [])
        ],
        "sources": [
            {"title": s.get("title"), "uri": s.get("document_uri"), "used": s.get("used"), "role": s.get("source_role")}
            for s in (result.get("sources") or [])
        ],
        "timings_ms": timings,
        "answer_mode": (result.get("policy") or {}).get("answer_mode"),
    }
    with SessionWriter(first_prompt=question) as s:
        s.user_message(question)
        s.assistant_message(
            answer,
            model=model,
            provider="hermes",
            stop_reason="end_turn",
            duration_ms=int(timings.get("total_ms") or 0) or None,
            arborist_meta=arborist_meta,
        )


# Soft-demote violation kinds that demote STRICT to HYBRID without
# rejecting the pointer outright. WARRANT_MISSING / TITLE_MISMATCH /
# DEFLECTION_DETECTED handled separately as hard demotes (their
# presence determines POINTER-LINKED vs ANCHOR-WARRANTED).
_SOFT_DEMOTE_VIOLATION_KINDS = frozenset({
    "LAZY_ANCHOR_DEMOTED",
    "POINTER_OVERFLOW_TRIMMED",
    "TOO_MANY_CLAIMS",
    "BARE_NAME_CLAIM",
    # FORMAT_COLLAPSED — model abandoned the claim_lattice_pointer
    # protocol (multi-line prose, zero [E\d+] tags). Soft-demotes
    # STRICT → HYBRID; pairs with the bottom UNGROUNDED rung when the
    # parser found nothing groundable, but at least surfaces the
    # collapse cause to the operator at audit-line glance.
    "FORMAT_COLLAPSED",
    # Ticket #000008 Phase 4 — broad-quantifier soft-demotes (§10.3).
    # Per §10.3 these stay as soft demotes (cap at ANCHOR-WARRANTED)
    # rather than minting a new audit_mode token. The audit-line tail
    # (rendered by _render_warrant_tail) names which one fired so an
    # operator can tell at a glance.
    "BROAD_QUANTIFIER_RUNAWAY",      # raw_line_count >> pointer_count
    "BROAD_QUANTIFIER_CAP_APPLIED",  # preflight cap fired below default
    "BROAD_QUANTIFIER_SCOPE_UNBOUND", # unbounded universal reached the LLM
    # BROAD_QUANTIFIER_REJECTED is a HARD demote (UNGROUNDED via
    # early-return) — listed here for completeness but doesn't
    # belong in the soft-demote set.
})


def _ladder_rung_for_lattice(audit_mode: str, violations: list[dict] | None) -> str:
    """Map (audit_mode, violations) → four-rung ladder for claim-lattice
    methods. Renderer-only transformation; schema column unchanged.

    The ladder names a strictly stronger property at each rung:

        POINTER-LINKED       pointer/source/chunk verified;
                             warrant either didn't apply or failed
        ANCHOR-WARRANTED     pointer-linked AND cited evidence contains
                             required anchors (warrant ran & passed);
                             other soft-demote violations may be present
        EVIDENCE-WARRANTED   anchor-warranted AND no soft demotes
        UNGROUNDED           n_verified == 0 (existing audit_mode)

    HYBRID adds `-PARTIAL` suffix to whichever rung applies.

    The rung logic uses the violations list as the signal — no new
    verifier output field needed. Three discriminators:
      - WARRANT_MISSING in violations → warrant ran and failed at
        least one claim → POINTER-LINKED (pointer ok but warrant
        didn't anchor the claim)
      - any soft-demote kind in violations → ANCHOR-WARRANTED (the
        rung is reached but other demotes pulled it back from STRICT)
      - no warrant miss AND no soft demotes → EVIDENCE-WARRANTED
    """
    if audit_mode == "UNGROUNDED":
        return "UNGROUNDED"
    kinds = {v.get("kind") for v in (violations or [])}
    # Hard demotes: WARRANT_MISSING, TITLE_MISMATCH, DEFLECTION_DETECTED
    # all indicate the citation is structurally misaligned with the
    # claim or the answer is structurally off-topic — pointer resolved
    # but the cited evidence (span, source, or whole answer) doesn't
    # actually support the user's question. All three drop to
    # POINTER-LINKED. (DEFLECTION_DETECTED was previously a soft demote
    # to ANCHOR-WARRANTED; the comeliness/fetish/investitures emergent
    # case showed that "the model totally shifted topic but anchored
    # the new topic" earned ANCHOR-WARRANTED unfairly. Off-topic
    # belongs at the lower rung.)
    if (
        "WARRANT_MISSING" in kinds
        or "TITLE_MISMATCH" in kinds
        or "DEFLECTION_DETECTED" in kinds
    ):
        rung = "POINTER-LINKED"
    elif kinds & _SOFT_DEMOTE_VIOLATION_KINDS:
        rung = "ANCHOR-WARRANTED"
    else:
        rung = "EVIDENCE-WARRANTED"
    if audit_mode == "HYBRID":
        rung = rung + "-PARTIAL"
    return rung


def _render_audit_label(
    audit_mode: str,
    verifier_method: str,
    violations: list[dict] | None = None,
) -> str:
    """Map (audit_mode, verifier_method, violations) → human-readable
    display label.

    Schema-level audit_mode names what the lexical verifier could
    confirm; the display label names what THAT means in honesty
    terms. STRICT in claim_lattice mode = "every pointer resolves
    to a valid evidence object whose source_role is allowed AND
    every cited span passes the citation-overlap coverage check."
    That is NOT full semantic entailment. The display label spells
    out the actual property so users don't read STRICT as "the
    answer is correct."

    Four-rung ladder for claim-lattice methods (per ticket #000005):

        POINTER-LINKED       pointer verified; warrant either didn't
                             apply or failed for some claim
        ANCHOR-WARRANTED     pointer-linked + warrant passed where
                             it ran; other soft demotes may apply
        EVIDENCE-WARRANTED   anchor-warranted + no soft demotes
        UNGROUNDED           no verified pairs

    HYBRID audits get a `-PARTIAL` suffix on whichever rung applies.

    Plus the verifier-method tail (`· via claim_lattice`,
    `· via claim_lattice_pointer`) so the user can see WHICH
    verifier path produced the verdict.

    For quote / span / entity / paraphrase mode the audit_mode
    labels carry less risk of overclaiming (they verify against
    pinned spans, not synthesis) — keep them as-is.

    `violations` defaults to None for backward compatibility with
    callers that don't have access to the violation list. With
    None, the ladder falls back to EVIDENCE-WARRANTED (the most
    optimistic rung) — operators get the same surface as before
    until callers thread violations through.
    """
    is_claim_lattice = verifier_method.startswith("claim_lattice")
    if is_claim_lattice:
        rung = _ladder_rung_for_lattice(audit_mode, violations)
        return f"{rung} · via {verifier_method}"
    if verifier_method == "canonical_projection":
        # Display label for #000027 — persisted canonical answers
        # (math/logic π* outputs). The line-rendering site appends
        # pi_star_ref as a tail since this helper doesn't have it.
        return "CANONICAL · via canonical_projection"
    # Quote / span / entity / paraphrase: keep audit_mode as the
    # primary token; append method for clarity.
    return f"{audit_mode} · via {verifier_method}"


def _render_warrant_tail(result: dict) -> str:
    """Append a tail to the audit-line label that names the specific
    warrant failure mode when one fired. Surfaces the cap reason at
    the user-facing layer without overloading audit_mode.

    Two failure modes today:
      - WARRANT_MISSING: relation/date/etc. anchor extracted from
        claim doesn't appear in any cited span (per-span check).
      - TITLE_MISMATCH: cited evidence's source title shares no
        content tokens with the claim (per-source check). 2026-05-02
        spin-glass case: claim about spin glass cited to *Quantum
        chromodynamics*.

    When both fire on different claims of the same answer, surface
    both tails so the operator sees the full picture."""
    violations = result.get("violations") or []
    kinds = {v.get("kind") for v in violations}
    parts: list[str] = []
    if "WARRANT_MISSING" in kinds:
        parts.append("warrant missing")
    # Phase 3 of #000031 — positive signal when a per-claim
    # warrant_check would have failed but a Merkle-bound warrant
    # chain to a primary-source surface exists (cited chunk's
    # document is a claim-pack record with a warrant-resolver
    # derivation row). Distinct from the source-level
    # `_render_warrant_chain_tail` which counts cited SOURCES with
    # chains; this counts CLAIMS that survived because of a chain.
    proven_idxs = result.get("warrant_proven_claim_idxs") or []
    if proven_idxs:
        n = len(proven_idxs)
        parts.append(f"warrant proven via chain ×{n}")
    if "TITLE_MISMATCH" in kinds:
        parts.append("title mismatch")
    if "FORMAT_COLLAPSED" in kinds:
        parts.append("format collapsed")
    # Ticket #000008 Phase 4 — broad-quantifier tails (§10.3 / §10.7).
    # Each names what the preflight detected so operators don't have to
    # parse violation lists by hand. Cap value comes from
    # `claim_cap_applied` on the result when present.
    if "BROAD_QUANTIFIER_REJECTED" in kinds:
        parts.append("broad rejected")
    elif "BROAD_QUANTIFIER_CAP_APPLIED" in kinds:
        cap = result.get("claim_cap_applied")
        parts.append(f"broad cap {cap}" if cap else "broad cap")
    elif "BROAD_QUANTIFIER_SCOPE_UNBOUND" in kinds:
        parts.append("broad unbounded")
    elif "BROAD_QUANTIFIER_RUNAWAY" in kinds:
        parts.append("broad runaway")
    # Ticket #000010 — meta-cognition logical-status tails. Pulled
    # from result["question_state"]["logical_statuses"] when present.
    # Doesn't double up with the broad-quantifier tails above (those
    # come from the verifier violation list, not the preflight).
    qs = result.get("question_state") or {}
    statuses = set(qs.get("logical_statuses") or [])
    if "false_premise_suspected" in statuses:
        parts.append("false premise")
    if "contradictory_question" in statuses:
        parts.append("contradictory")
    if "stale_risk" in statuses:
        parts.append("stale risk")
    if "out_of_corpus_risk" in statuses:
        parts.append("out of corpus")
    if "reference_frame_ambiguous" in statuses:
        parts.append("frame ambiguous")
    # Ticket #000011 — soft preflight sidecar hint. Renders distinctly
    # from the hard tails above so an operator can tell at a glance
    # that the signal is advisory. Skips SOFT_DISABLED / SOFT_PARSE_FAIL
    # / SOFT_WELL_FORMED (no actionable signal).
    soft = result.get("soft_preflight_hint") or {}
    soft_label = soft.get("classifier_label") or ""
    if soft_label and soft_label not in (
        "SOFT_DISABLED", "SOFT_PARSE_FAIL", "SOFT_WELL_FORMED",
    ):
        # Strip SOFT_ prefix + lowercase for tail readability
        # (e.g. SOFT_FALSE_PREMISE_SUSPECTED → "false premise suspected").
        readable = soft_label.removeprefix("SOFT_").lower().replace("_", " ")
        parts.append(f"soft: {readable}")
    # Ticket #000026 Phase 3 — authorship warrant tail. Shows operator
    # the strongest tier the cited evidence supports for an
    # authorship-shaped question. Strong tiers (1-4) advertise the
    # win; weak tiers (5-6) flag that the warrant rests on shaky
    # evidence. NO_AUTHORSHIP_SIGNAL stays silent (sidecar's
    # "doesn't apply" verdict).
    authorship = result.get("authorship") or {}
    auth_tier = authorship.get("tier")
    if auth_tier and auth_tier != "NO_AUTHORSHIP_SIGNAL":
        # Strip AUTHOR_ prefix + lowercase for tail readability
        # (AUTHOR_COPYRIGHT_FOOTER → "copyright-footer").
        readable = auth_tier.removeprefix("AUTHOR_").lower().replace("_", "-")
        parts.append(f"warrant: {readable}")
    if not parts:
        return ""
    return " · " + " · ".join(parts)


def _maybe_render_json_envelope_as_bullets(answer: str) -> str:
    """If `answer` is a claim_lattice JSON envelope, render bullets.

    JSON-mode runs that land UNGROUNDED have no verified claims so the
    runtime's bullet renderer produces empty text and `answer_text`
    falls back to the raw model output — a `{"claims":[...]}` envelope.
    The user then sees raw JSON for failed runs and bullets for
    successful ones, which reads as inconsistent. Detect the JSON
    shape, parse it (lenient), and render each claim's `text` as a
    bullet line tagged with its evidence_ids so the surface stays
    consistent across grounded / ungrounded outcomes.

    Falls back to the raw input unchanged if:
      - input doesn't look like JSON (no leading `{`)
      - parse fails (lenient parser exception)
      - parse succeeds but the shape isn't `{"claims": [...]}`
    """
    stripped = (answer or "").lstrip()
    if not stripped.startswith("{") and not stripped.startswith("```"):
        return answer
    if "claims" not in stripped:
        return answer
    try:
        from arborist.qa.verify import _lenient_json_parse
        parsed, _fixups = _lenient_json_parse(answer)
    except Exception:
        return answer
    if not isinstance(parsed, dict):
        return answer
    raw_claims = parsed.get("claims")
    if not isinstance(raw_claims, list) or not raw_claims:
        return answer
    out_lines: list[str] = []
    for c in raw_claims:
        if not isinstance(c, dict):
            continue
        text = c.get("text") or ""
        if not isinstance(text, str) or not text.strip():
            continue
        eids = c.get("evidence_ids") or []
        if isinstance(eids, list) and eids:
            ids = ",".join(str(x) for x in eids if isinstance(x, str))
            out_lines.append(f"- {text.strip()}  [{ids}: unverified]")
        else:
            out_lines.append(f"- {text.strip()}")
    return "\n".join(out_lines) if out_lines else answer


def _render_query_human(result: dict, question: str) -> str:
    """Pretty-print a query result for terminal reading.

    Layout:
        question
        AUDIT_MODE  N/M verified  via verifier_method  Xs  (cached|fresh)

        answer text...

        sources (K):
          [1] Title — host/path  (shard.db)
          [2] ...

        unverified (J):
          - "..."

        cache_key: 35ab7d33…   <run with --json for full record>

    Errors / no-source paths fall back to a short status line.
    """
    status = result.get("status")
    if status == "broad_quantifier_rejected":
        # Phase 4 reject-broad early-return path. The result carries
        # an answer_text with the rejection rationale + a violations
        # list; render both so the operator sees WHY without --json.
        answer_text = result.get("answer_text") or ""
        violations = result.get("violations") or []
        kind = next(
            (v.get("kind") for v in violations
             if v.get("kind") == "BROAD_QUANTIFIER_REJECTED"),
            "BROAD_QUANTIFIER_REJECTED",
        )
        intensity = result.get("quantifier_intensity") or "?"
        token = result.get("quantifier_matched_token") or "?"
        cap = result.get("claim_cap_applied")
        cap_str = f" · cap was {cap}" if cap else ""
        return (
            f"{question}\n"
            f"  UNGROUNDED · via {kind} · {intensity} (\"{token}\")"
            f"{cap_str}  0/0  0.0s  (preflight)\n\n"
            f"{answer_text}"
        )
    if status == "canonical_projection":
        # Math/logic π* short-circuited the RAG path — the canonical
        # bytes ARE the answer. No retrieval, no LLM, no cache.
        pi_star_ref = result.get("pi_star_ref", "?")
        answer_text = result.get("answer_text") or ""
        timings = result.get("timings") or {}
        total_ms = timings.get("total_ms")
        elapsed = (
            f"{total_ms / 1000:.1f}s"
            if isinstance(total_ms, (int, float))
            else "?"
        )
        # Ticket #000028 — render-layer witness tail. When witness ran,
        # surface the agreement label + per-modality status. Pure
        # render-only; cache_key / governance_policy_hash unaffected.
        witness = result.get("witness")
        witness_tail = ""
        witness_block = ""
        if witness:
            label = witness.get("agreement_label", "?")
            mods = witness.get("modalities") or {}
            ground_truth_hex = witness.get("canonical_answer_bytes_hex") or ""
            agree_count = sum(
                1 for name, m in mods.items()
                if name != "kernel"
                and m.get("ok")
                and (m.get("canonical_bytes_hex") or "") == ground_truth_hex
            )
            checkable = sum(
                1 for name, m in mods.items()
                if name != "kernel" and m.get("error") != "ABSENT"
            )
            witness_tail = f"  [{label} · {agree_count}/{checkable} modalities agree]"
            lines = ["", "witness:"]
            for name in ("kernel", "cache", "llm"):
                m = mods.get(name)
                if not m:
                    continue
                raw = m.get("raw_answer")
                err = m.get("error")
                ms = int(m.get("elapsed_ms") or 0)
                if err == "ABSENT":
                    detail = "absent"
                elif err == "PIS_REJECT":
                    detail = f'rejected: "{raw}"'
                elif err == "TIMEOUT":
                    detail = "timeout"
                elif err and err.startswith("LLM_ERROR"):
                    detail = err.lower()
                else:
                    detail = f'"{raw}"'
                lines.append(f"  {name:<7} {detail}  ({ms}ms)")
            witness_block = "\n".join(lines)
        return (
            f"{question}\n"
            f"  CANONICAL · via {pi_star_ref}{witness_tail}  {elapsed}  (projected)\n\n"
            f"{answer_text}"
            f"{witness_block}"
        )
    if status not in ("cache_hit", "cache_miss_then_written"):
        msg = result.get("msg") or status or "unknown error"
        return f"  {status or 'error'}: {msg}"

    audit = result.get("audit_mode", "?")
    n_quotes = result.get("n_quotes", 0) or 0
    n_verified = result.get("n_verified", 0) or 0
    method = result.get("verifier_method", "?")
    timings = result.get("timings") or {}
    total_ms = timings.get("total_ms")
    elapsed = f"{total_ms / 1000:.1f}s" if isinstance(total_ms, (int, float)) else "?"
    cache_status = "cached" if status == "cache_hit" else "fresh"
    lookup_path = result.get("lookup_path")
    # Annotate cache_hits when they came from a fallback ckey, not the
    # primary one — useful when an agent ran with fidelity=equivalence_class
    # and reused another agent's record.
    if lookup_path and lookup_path.endswith("_fallback"):
        cache_status = f"cached via {lookup_path}"

    # Render-layer label honesty: schema audit_mode (STRICT/HYBRID/
    # UNGROUNDED) describes what the lexical verifier checked, not
    # full semantic entailment. The display label combines audit_mode
    # with verifier_method so the user sees what was actually
    # verified. Four-rung ladder for claim-lattice methods (#000005):
    #   POINTER-LINKED      pointer verified; warrant didn't apply
    #                       or failed for some claim
    #   ANCHOR-WARRANTED    pointer-linked + warrant passed where
    #                       it ran; other soft demotes may apply
    #   EVIDENCE-WARRANTED  anchor-warranted + no soft demotes
    #   UNGROUNDED          no verified pairs
    # Schema column stays unchanged; pure display.
    display_label = _render_audit_label(audit, method, result.get("violations"))
    warrant_tail = _render_warrant_tail(result)
    # Ticket #000031 Phase 3 — when a cited source has a warrant-
    # resolver derivation row tying it to a primary-source surface
    # chunk, surface a `· warrant: N proven` tail so the user sees
    # the chain-of-custody. Pure render layer; cache_key /
    # audit_mode / governance_policy_hash all unchanged. The
    # _shards_dir is stashed on the result dict by `_cmd_query`
    # before render time (the CLI has it; the renderer doesn't).
    warrant_chain_tail = _render_warrant_chain_tail(result)

    lines: list[str] = []
    lines.append(question)
    lines.append(
        f"  {display_label}{warrant_tail}{warrant_chain_tail}  {n_verified}/{n_quotes}  "
        f"{elapsed}  ({cache_status})"
    )
    lines.append("")

    answer = result.get("answer_text") or ""
    # When JSON-mode runs land UNGROUNDED, rendered_text is empty and
    # answer_text falls back to the raw model output — a JSON envelope.
    # Parse it and render each claim as a bullet so the user gets the
    # same shape whether the run grounded or not. Falls back to raw
    # display if parse fails or output isn't JSON-shaped.
    answer = _maybe_render_json_envelope_as_bullets(answer)
    lines.append(answer)
    lines.append("")

    sources = result.get("sources") or []
    if sources:
        lines.append(f"sources ({len(sources)}):")
        for i, s in enumerate(sources, start=1):
            uri = s.get("document_uri", "")
            title = (s.get("title") or "").strip() or _short_path(uri)
            shard = s.get("shard")
            shard_part = f"  ({shard})" if shard else ""
            # Render-layer source-role display + used/unused annotation
            # (claim_lattice modes only — quote-mode results don't carry
            # the per-source `used` flag). Honestly surfaces "the system
            # retrieved noise but did not rely on it" so the user can
            # see the model ignoring distractors instead of having to
            # infer it. Pre-2026-05-01 the source list showed every
            # retrieved doc indistinguishably.
            role = s.get("source_role")
            used = s.get("used")
            pointer_ids = s.get("used_pointer_ids") or []
            annotations: list[str] = []
            if role:
                annotations.append(role)
            if used is True:
                if pointer_ids:
                    annotations.append(f"used ({','.join(pointer_ids)})")
                else:
                    annotations.append("used")
            elif used is False:
                annotations.append("unused")
            if annotations:
                annotation_part = " — " + " — ".join(annotations)
            else:
                annotation_part = ""
            lines.append(
                f"  [{i}] {title}{annotation_part}{_strip_scheme(uri)}{shard_part}"
            )
        lines.append("")

        # Retrieval-purity one-line summary (claim_lattice modes only).
        # "primary at #R · used N/M sources · M-N noise unused"
        # Surfaces noise-resistance at a glance without making the
        # user count rows themselves.
        purity = result.get("retrieval_purity")
        if purity:
            primary_rank = purity.get("primary_rank") or 0
            used = purity.get("used_sources", 0)
            total = purity.get("total_sources", 0)
            noise_unused = (
                purity.get("noise_sources_count", 0)
                - purity.get("noise_sources_used", 0)
            )
            primary_part = (
                f"primary at #{primary_rank}"
                if primary_rank > 0 else "no primary in top-K"
            )
            noise_part = (
                f" · {noise_unused} noise unused" if noise_unused else ""
            )
            lines.append(
                f"  retrieval purity: {primary_part} · used "
                f"{used}/{total} sources{noise_part}"
            )
            lines.append("")

    partially = result.get("partially_verified_quotes") or []
    if partially:
        lines.append(f"partially grounded ({len(partially)}):")
        for q in partially:
            qtxt = q if len(q) <= 100 else q[:97] + "..."
            lines.append(f'  - "{qtxt}"')
        lines.append("")

    unverified = result.get("unverified_quotes") or []
    if unverified:
        lines.append(f"unverified ({len(unverified)}):")
        for q in unverified:
            qtxt = q if len(q) <= 100 else q[:97] + "..."
            lines.append(f'  - "{qtxt}"')
        lines.append("")

    # Anchor-smell sidecar (claim_lattice mode only). Soft signal —
    # never persisted, never in cache_key. Surfaces when ≥50% of
    # verified claim-pointer pairs share one pointer_id AND there
    # are at least 3 verified pairs to compare; below 3 the ratio is
    # vacuous (1/1 always = 1.00 even when nothing is wrong).
    ratio = result.get("lazy_anchor_ratio")
    distribution = result.get("pointer_id_distribution") or {}
    total_pairs = sum(distribution.values()) if distribution else 0
    if (
        method == "claim_lattice"
        and isinstance(ratio, (int, float))
        and ratio >= 0.5
        and total_pairs >= 3
    ):
        top_pid, top_count = max(distribution.items(), key=lambda kv: kv[1])
        lines.append(
            f"lazy-anchor smell: {top_count} of {total_pairs} verified "
            f"pairs cite [{top_pid}] (ratio {ratio:.2f}); "
            f"distinct pointers cited: {len(distribution)}"
        )
        lines.append("")

    pc = result.get("prompt_chars") or {}
    if pc:
        # Compact one-liner — operator at a glance: did STRICT come
        # from a tight prompt or a context-stuffed one?
        lines.append(
            f"capacity: prompt {pc.get('messages_total', 0):,} chars "
            f"(sys {pc.get('system_prompt', 0):,} + "
            f"reminder {pc.get('grounding_reminder', 0):,} + "
            f"evidence {pc.get('evidence_or_context', 0):,} + "
            f"question {pc.get('user_question', 0):,}) → "
            f"answer {result.get('answer_chars', 0):,} chars"
        )

    # Per-phase timings — surfaces where the per-call cost lands.
    # Hermes-bound queries should show `llm_ms` dominating; if
    # search_ms or persist_ms creeps up that's a retrieval / WAL
    # signal the operator wants visible. Cache-hit rows skip llm
    # entirely so the breakdown also tells you which path you're
    # paying for.
    timings = result.get("timings") or {}
    if timings:
        parts: list[str] = []
        order = (
            ("cache_lookup_ms", "cache"),
            ("search_ms", "search"),
            ("context_ms", "context"),
            ("llm_ms", "llm"),
            ("persist_ms", "persist"),
        )
        for key, label in order:
            v = timings.get(key)
            if isinstance(v, (int, float)) and v > 0:
                parts.append(f"{label} {v / 1000:.2f}s")
        total = timings.get("total_ms")
        if isinstance(total, (int, float)):
            parts.append(f"**total {total / 1000:.2f}s**")
        if parts:
            lines.append("timings: " + " · ".join(parts))

    # Reference-frame notes (Ticket #000002). When detect_frame
    # classified the query as `reference`, surface the named work
    # so an operator knows the substrate routed to a fictional
    # source. Skipped for literal / no-phrase-route / ambiguous
    # rows — the line only appears when there's something to say.
    fd = result.get("frame_detection") or {}
    if fd.get("kind") == "reference" and fd.get("reference_title"):
        lines.append("")
        lines.append(f"reference frame: {fd['reference_title']}")
        if fd.get("reference_uri"):
            lines.append(f"  cited as the named work in the answer")

    cache_key = (result.get("cache_key") or "")[:8]
    lines.append(f"cache_key: {cache_key}…   <run with --json for full record>")
    return "\n".join(lines)


def _strip_scheme(uri: str) -> str:
    """`https://en.wikipedia.org/wiki/X` -> `en.wikipedia.org/wiki/X`."""
    for prefix in ("https://", "http://"):
        if uri.startswith(prefix):
            return uri[len(prefix):]
    return uri


def _short_path(uri: str) -> str:
    """Last URL segment as a fallback display name."""
    s = _strip_scheme(uri).rstrip("/")
    if "/" in s:
        return s.rsplit("/", 1)[1]
    return s


def _cmd_inspect(args: argparse.Namespace) -> int:
    """Sidecar diagnostic — pulls source chunks for a cache_key and
    classifies each unverified span. Read-only; no audit events, no
    providence_cache mutations.
    """
    from arborist.qa.inspect import inspect_cache_key

    qa_db = args.qa_db
    if qa_db is None:
        qa_db = (
            Path(args.global_shards_dir) / "qa.db"
            if args.global_shards_dir
            else Path.home() / ".arborist" / "qa.db"
        )
    shards_dir = (
        Path(args.global_shards_dir) if args.global_shards_dir else None
    )
    single_db = None if shards_dir else Path(args.db) if args.db else None

    result = inspect_cache_key(
        args.cache_key,
        qa_db=Path(qa_db),
        shards_dir=shards_dir,
        single_db=single_db,
    )

    if args.json:
        print(json.dumps(result, indent=2, ensure_ascii=False))
    else:
        print(_render_inspect_human(result))
    return 0 if result.get("status") == "ok" else 1


def _render_inspect_human(result: dict) -> str:
    """Pretty-print an inspect result so an operator can scan
    paraphrase vs invention vs trailing-artifact at a glance."""
    if result.get("status") != "ok":
        return f"  {result.get('status', 'error')}: cache_key={result.get('cache_key', '?')}"

    rec = result["record"]
    ctx = result["context"]
    sources = result["sources"]
    diagnoses = result["unverified"]

    lines: list[str] = []
    lines.append(rec["question_text"])
    lines.append(
        f"  {rec['audit_mode']}  {rec['n_verified']}/{rec['n_quotes']} verified  "
        f"via {rec['verifier_method']}  state={rec['falsification_state']}"
    )
    lines.append("")
    lines.append(
        f"context: {ctx['raw_chars']:,} raw -> {ctx['base_chars']:,} base "
        f"(wikitext-strip {'on' if ctx['wikitext_strip_active'] else 'off'})"
    )
    if sources:
        lines.append(f"sources ({len(sources)}):")
        for i, s in enumerate(sources, start=1):
            lines.append(
                f"  [{i}] {s.get('title') or '(untitled)'} — "
                f"{s.get('chunk_count', '?')} chunks, "
                f"{s.get('raw_chars', 0):,} chars"
            )
    lines.append("")

    coh = result.get("coherence") or {}
    if coh.get("kind") not in (None, "ok", "empty"):
        ev = coh.get("evidence") or {}
        if coh["kind"] == "phrase_component_reuse":
            detail = f" (reuses {ev.get('reused_tokens')} from a quoted phrase)"
        else:  # circular | vacuous
            detail = f" (subject={ev.get('subject_tokens')})"
        lines.append(f"  · incoherent: {coh['kind']}{detail}")
        lines.append(f"      sentence: {_short(ev.get('sentence', ''), 140)}")
        lines.append("")

    if not diagnoses:
        lines.append("(no unverified spans)")
        return "\n".join(lines)

    lines.append(f"unverified diagnoses ({len(diagnoses)}):")
    for i, d in enumerate(diagnoses, start=1):
        span = d.get("span", "")
        diag = d.get("diagnosis", "?")
        lines.append("")
        lines.append(f"  [{i}] {diag}")
        lines.append(f"      span: {_short(span, 140)}")
        if diag == "trailing_artifact":
            lines.append(f"      matched_prefix_chars: {d.get('matched_prefix_chars')}")
            lines.append(f"      trailing_artifact: {_short(d.get('trailing_artifact', ''), 100)}")
        elif diag == "synthetic_elision_inside_quote":
            lines.append(
                f"      [...] inserted by model — "
                f"{d.get('prefix_chars', 0)} prefix chars "
                f"({'in source' if d.get('prefix_in_source') else 'NOT in source'}), "
                f"{d.get('suffix_chars', 0)} suffix chars "
                f"({'in source' if d.get('suffix_in_source') else 'NOT in source'})"
            )
        elif diag == "interior_elision":
            lines.append(
                f"      matched: {d.get('matched_prefix_chars')} prefix + "
                f"{d.get('matched_suffix_chars')} suffix chars (parenthetical aside dropped)"
            )
            lines.append(f"      dropped_aside: {_short(d.get('dropped_aside', ''), 120)}")
        elif diag in ("paraphrase", "partial_paraphrase"):
            lines.append(f"      token_coverage: {d.get('token_coverage')}")
            counts = d.get("token_counts", {})
            if counts:
                top = ", ".join(f"{k}×{v}" for k, v in list(counts.items())[:6])
                lines.append(f"      tokens_in_base: {top}")
            missing = d.get("missing_tokens", [])
            if missing:
                lines.append(f"      missing_tokens: {missing[:8]}")
        elif diag == "no_overlap":
            lines.append(f"      tokens_checked: {d.get('tokens_checked', '?')}")
            lines.append(f"      tokens_present: {d.get('tokens_present', '?')}")
        repair = d.get("repair")
        if repair:
            action = repair.get("action", "?")
            reason = repair.get("reason", "")
            line = f"      repair: {action}"
            if reason:
                line += f"  ({reason})"
            lines.append(line)
    return "\n".join(lines)


def _short(s: str, n: int) -> str:
    """Truncate string to n chars with ellipsis."""
    return s if len(s) <= n else s[: n - 3] + "..."


def _cmd_losses(args: argparse.Namespace) -> int:
    """Sidecar diagnostic: list adapter_loss_reports rows.

    Read-only. Filters: --document-root, --chunk-id, --kind, --stage.
    With --summary, aggregates per (stage, loss_kind) for a single
    document. See ticket #000022 §3.6.
    """
    conn = (
        connect_query(args.db, shards_dir=args.global_shards_dir)
        if args.global_shards_dir
        else connect(args.db)
    )
    try:
        where: list[str] = []
        params: list = []
        if args.document_root:
            where.append("document_root = ?")
            params.append(args.document_root)
        if args.chunk_id is not None:
            where.append("chunk_id = ?")
            params.append(args.chunk_id)
        if args.kind:
            where.append("loss_kind = ?")
            params.append(args.kind)
        if args.stage:
            where.append("stage = ?")
            params.append(args.stage)
        clause = (" WHERE " + " AND ".join(where)) if where else ""

        if args.summary:
            sql = (
                "SELECT stage, loss_kind, loss_mode, "
                "       SUM(bytes_dropped) AS total_bytes, "
                "       SUM(occurrence_count) AS total_occ, "
                "       COUNT(*) AS row_count "
                "FROM adapter_loss_reports"
                + clause +
                " GROUP BY stage, loss_kind, loss_mode "
                "ORDER BY stage, total_bytes DESC"
            )
            rows = conn.execute(sql, params).fetchall()
            if args.json:
                out = [dict(r) for r in rows]
                print(json.dumps(out, indent=2, ensure_ascii=False))
            else:
                if not rows:
                    print("(no loss rows match)")
                    return 0
                print(f"{'stage':<18} {'kind':<28} {'mode':<11} {'occ':>7} {'bytes':>10} {'rows':>6}")
                for r in rows:
                    print(
                        f"{r['stage']:<18} {r['loss_kind']:<28} "
                        f"{r['loss_mode']:<11} {r['total_occ']:>7,} "
                        f"{r['total_bytes']:>10,} {r['row_count']:>6,}"
                    )
            return 0

        sql = (
            "SELECT chunk_id, document_root, stage, canonicalization_version, "
            "       loss_kind, loss_mode, bytes_dropped, occurrence_count, "
            "       input_length_bytes, output_length_bytes, sample_excerpt, "
            "       sample_hash, adapter_name, adapter_version, "
            "       loss_report_policy_hash, created_at "
            "FROM adapter_loss_reports"
            + clause +
            " ORDER BY chunk_id, stage, loss_kind LIMIT ?"
        )
        params.append(args.limit)
        rows = conn.execute(sql, params).fetchall()
        if args.json:
            out = [dict(r) for r in rows]
            print(json.dumps(out, indent=2, ensure_ascii=False))
            return 0
        if not rows:
            print("(no loss rows match)")
            return 0
        for r in rows:
            print(
                f"chunk={r['chunk_id']} doc={r['document_root'][:12]}.. "
                f"stage={r['stage']} kind={r['loss_kind']} "
                f"mode={r['loss_mode']} occ={r['occurrence_count']} "
                f"bytes={r['bytes_dropped']}"
            )
            if r["sample_excerpt"]:
                print(f"  excerpt: {_short(r['sample_excerpt'], 120)!r}")
            elif r["sample_hash"]:
                print(f"  sample_hash: {r['sample_hash'][:16]}..")
        return 0
    finally:
        conn.close()


def _falsify_cache_key(
    cache_key_value: str,
    *,
    state: str,
    reason: str,
    by_actor: str,
    shards_dir: Path | None,
    db_path: Path | None,
) -> dict:
    """Mark a providence_cache record as failed/stale/quarantined across shards.

    Searches every shard for the cache_key (it lives in exactly one).
    Updates the row's falsification_state, appends a falsification log
    entry, and writes a 'falsify' audit event so the chain records the act.
    """
    import time as _time

    from arborist.store import append_audit, discover_shards, transaction

    if state not in ("failed", "stale", "quarantined"):
        return {"status": "invalid_state", "value": state}

    paths: list[Path] = (
        discover_shards(shards_dir) if shards_dir else [Path(db_path)]
    )

    for sp in paths:
        c = connect(sp)
        try:
            row = c.execute(
                "SELECT cache_key, falsification_state FROM providence_cache "
                "WHERE cache_key = ?",
                (cache_key_value,),
            ).fetchone()
            if row is None:
                continue
            now = int(_time.time())
            with transaction(c):
                event_hash = append_audit(
                    c,
                    event_type="falsify",
                    subject_root=cache_key_value,
                    body={
                        "cache_key": cache_key_value,
                        "from_state": row["falsification_state"],
                        "to_state": state,
                        "reason": reason,
                        "by_actor": by_actor,
                    },
                    ts=now,
                )
                c.execute(
                    "UPDATE providence_cache "
                    "SET falsification_state = ?, audit_event_hash = ? "
                    "WHERE cache_key = ?",
                    (state, event_hash, cache_key_value),
                )
                c.execute(
                    "INSERT INTO falsifications "
                    "(cache_key, state, reason, by_actor, at, audit_event_hash) "
                    "VALUES (?, ?, ?, ?, ?, ?)",
                    (cache_key_value, state, reason, by_actor, now, event_hash),
                )
            return {
                "status": "falsified",
                "cache_key": cache_key_value,
                "shard": sp.name,
                "from_state": row["falsification_state"],
                "to_state": state,
                "reason": reason,
                "by_actor": by_actor,
                "audit_event_hash": event_hash,
                "ts": now,
            }
        finally:
            c.close()

    return {"status": "not_found", "cache_key": cache_key_value}


def _burn_cache_key(
    cache_key_value: str,
    *,
    reason: str,
    by_actor: str,
    shards_dir: Path | None,
    db_path: Path | None,
    force: bool = False,
) -> dict:
    """Delete a providence_cache leaf, but only if it has no children.

    "Kindergarten of a tree's genesis" — early/scratch use. Falsify keeps
    history; burn removes the row. Children today = falsifications
    referencing this cache_key. If any exist, refuse without ``--force``.

    Always writes a 'providence_burn' audit event so the chain records
    that a leaf was removed and why. Use ``arborist providence --falsify``
    instead when downstream consumers may have built on this answer.
    """
    import time as _time

    from arborist.store import append_audit, discover_shards, transaction

    paths: list[Path] = (
        discover_shards(shards_dir) if shards_dir else [Path(db_path)]
    )

    for sp in paths:
        c = connect(sp)
        try:
            row = c.execute(
                "SELECT cache_key, audit_mode, n_verified, falsification_state, "
                " question_text FROM providence_cache WHERE cache_key = ?",
                (cache_key_value,),
            ).fetchone()
            if row is None:
                continue
            child_falsifications = c.execute(
                "SELECT COUNT(*) FROM falsifications WHERE cache_key = ?",
                (cache_key_value,),
            ).fetchone()[0]
            if child_falsifications > 0 and not force:
                return {
                    "status": "refused_has_children",
                    "cache_key": cache_key_value,
                    "shard": sp.name,
                    "child_falsifications": int(child_falsifications),
                    "hint": "use --force to burn anyway, or 'providence --falsify' to keep history",
                }
            now = int(_time.time())
            # DELETE + burn audit event are one atomic unit — never a burned
            # cache row without its audit event, never the reverse.
            with transaction(c):
                c.execute(
                    "DELETE FROM providence_cache WHERE cache_key = ?",
                    (cache_key_value,),
                )
                event_hash = append_audit(
                    c,
                    event_type="providence_burn",
                    subject_root=cache_key_value,
                    body={
                        "cache_key": cache_key_value,
                        "burned_audit_mode": row["audit_mode"],
                        "burned_n_verified": int(row["n_verified"]),
                        "burned_state": row["falsification_state"],
                        "question_text": row["question_text"],
                        "reason": reason,
                        "by_actor": by_actor,
                        "child_falsifications_at_burn": int(child_falsifications),
                        "forced": bool(child_falsifications > 0 and force),
                    },
                    ts=now,
                )
            return {
                "status": "burned",
                "cache_key": cache_key_value,
                "shard": sp.name,
                "burned_audit_mode": row["audit_mode"],
                "reason": reason,
                "by_actor": by_actor,
                "audit_event_hash": event_hash,
                "ts": now,
            }
        finally:
            c.close()

    return {"status": "not_found", "cache_key": cache_key_value}


def _count_document_children(c, document_root: str) -> dict:
    """Count outbound child references that 'burn' must protect.

    For a document/core leaf, "children" = anything downstream that built on
    this row. Specifically:
      - derivations rows where ``src_root = root`` (a core was distilled
        from this — burning would orphan or silently cascade-truncate the
        derivation, leaving the descendant core dangling).
      - edges rows where ``dst_root = root`` (other documents link to this
        one; burning leaves dangling references).
      - providence_cache rows where ``source_root = root`` (Q&A grounded
        in this document).

    NOTE on schema: derivations has ON DELETE CASCADE on BOTH ``core_root``
    AND ``src_root``. Without this gate, a bare DELETE FROM documents would
    silently cascade-prune derivations and orphan downstream cores.
    """
    derivations_downstream = c.execute(
        "SELECT COUNT(*) FROM derivations WHERE src_root = ?",
        (document_root,),
    ).fetchone()[0]
    incoming_edges = c.execute(
        "SELECT COUNT(*) FROM edges WHERE dst_root = ?",
        (document_root,),
    ).fetchone()[0]
    providence_refs = c.execute(
        "SELECT COUNT(*) FROM providence_cache WHERE source_root = ?",
        (document_root,),
    ).fetchone()[0]
    return {
        "derivations_downstream": int(derivations_downstream),
        "incoming_edges": int(incoming_edges),
        "providence_refs": int(providence_refs),
    }


def _burn_document_root(
    document_root_value: str,
    *,
    reason: str,
    by_actor: str,
    shards_dir: Path | None,
    db_path: Path | None,
    force: bool = False,
) -> dict:
    """Delete a surface document leaf, but only if it has no children.

    Children:
      - derivations.src_root = root      (downstream cores derived from it)
      - edges.dst_root       = root      (other docs link to it)
      - providence_cache.source_root = root  (Q&A grounded in it)

    On burn:
      - DELETE FROM chunks_fts (FTS5 has no FK; clear before chunks vanish).
      - DELETE FROM documents — cascades to chunks + merkle_nodes via FK.
      - Append a 'document_burn' audit event recording counts + forced flag.

    Refuses with status='refused_has_children' (and skips the audit event)
    when any child count > 0 and ``--force`` is not set, so callers can fix
    state and retry idempotently.
    """
    import time as _time

    from arborist.store import append_audit, discover_shards, transaction

    paths: list[Path] = (
        discover_shards(shards_dir) if shards_dir else [Path(db_path)]
    )

    for sp in paths:
        c = connect(sp)
        try:
            row = c.execute(
                "SELECT document_root, document_uri, kind, title, source_type, "
                "       chunking_version, canonicalization_version, schema_version "
                "FROM documents WHERE document_root = ? AND kind = 'surface'",
                (document_root_value,),
            ).fetchone()
            if row is None:
                continue
            counts = _count_document_children(c, document_root_value)
            total_children = sum(counts.values())
            if total_children > 0 and not force:
                return {
                    "status": "refused_has_children",
                    "document_root": document_root_value,
                    "kind": "surface",
                    "shard": sp.name,
                    **counts,
                    "hint": "use --force to burn anyway (orphans descendants); "
                            "prefer evict for cold-tier compression",
                }
            chunk_count = c.execute(
                "SELECT COUNT(*) FROM chunks WHERE document_root = ?",
                (document_root_value,),
            ).fetchone()[0]
            now = int(_time.time())
            with transaction(c):
                # FTS5 has no FK to chunks; clear by chunk_id before the
                # CASCADE on documents wipes the rows that resolve them.
                for cr in c.execute(
                    "SELECT chunk_id FROM chunks WHERE document_root = ?",
                    (document_root_value,),
                ).fetchall():
                    c.execute(
                        "DELETE FROM chunks_fts WHERE rowid = ?",
                        (cr["chunk_id"],),
                    )
                # Outbound edges (src_root = this) carry no FK; clean them
                # explicitly so we don't leave half-edges pointing from a
                # ghost. Incoming edges (dst_root = this) are already gated
                # above by the children check.
                c.execute(
                    "DELETE FROM edges WHERE src_root = ?",
                    (document_root_value,),
                )
                # documents -> chunks/merkle_nodes/derivations cascade via FK.
                c.execute(
                    "DELETE FROM documents WHERE document_root = ?",
                    (document_root_value,),
                )
            event_hash = append_audit(
                c,
                event_type="document_burn",
                subject_root=document_root_value,
                body={
                    "document_root": document_root_value,
                    "document_uri": row["document_uri"],
                    "kind": "surface",
                    "title": row["title"],
                    "source_type": row["source_type"],
                    "burned_chunk_count": int(chunk_count),
                    "child_counts_at_burn": counts,
                    "reason": reason,
                    "by_actor": by_actor,
                    "forced": bool(total_children > 0 and force),
                },
                ts=now,
            )
            return {
                "status": "burned",
                "document_root": document_root_value,
                "kind": "surface",
                "shard": sp.name,
                "burned_chunk_count": int(chunk_count),
                "child_counts_at_burn": counts,
                "reason": reason,
                "by_actor": by_actor,
                "audit_event_hash": event_hash,
                "ts": now,
            }
        finally:
            c.close()

    return {"status": "not_found", "document_root": document_root_value, "kind": "surface"}


def _burn_core_root(
    document_root_value: str,
    *,
    reason: str,
    by_actor: str,
    shards_dir: Path | None,
    db_path: Path | None,
    force: bool = False,
) -> dict:
    """Delete a core document leaf, but only if it has no children.

    Same children gates as ``_burn_document_root`` (derivations.src_root,
    edges.dst_root, providence_cache.source_root). The "PLUS no further
    derivations build cores from this core" rule from the spec is
    structurally identical to derivations.src_root > 0 — a core acts as a
    src_root only when something deeper distilled from it.

    CLAUDE.md says "Cores never evict" — that's the eviction subsystem,
    which only touches kind='surface'. Burn is operator-driven removal:
    cores CAN be burned, but the children gate is enforced.

    Audit event type is 'core_burn' so chain consumers can distinguish
    surface vs core leaf removals at a glance.
    """
    import time as _time

    from arborist.store import append_audit, discover_shards, transaction

    paths: list[Path] = (
        discover_shards(shards_dir) if shards_dir else [Path(db_path)]
    )

    for sp in paths:
        c = connect(sp)
        try:
            row = c.execute(
                "SELECT document_root, document_uri, kind, title, source_type, "
                "       compression_depth, chunking_version, "
                "       canonicalization_version, schema_version "
                "FROM documents WHERE document_root = ? AND kind = 'core'",
                (document_root_value,),
            ).fetchone()
            if row is None:
                continue
            counts = _count_document_children(c, document_root_value)
            total_children = sum(counts.values())
            if total_children > 0 and not force:
                return {
                    "status": "refused_has_children",
                    "document_root": document_root_value,
                    "kind": "core",
                    "shard": sp.name,
                    **counts,
                    "hint": "use --force to burn anyway; cores carry "
                            "downstream derivations that will be orphaned",
                }
            chunk_count = c.execute(
                "SELECT COUNT(*) FROM chunks WHERE document_root = ?",
                (document_root_value,),
            ).fetchone()[0]
            # Inbound derivations (where this core is core_root, i.e. its
            # binding back to source surfaces). These are NOT children —
            # they're the core's own provenance and cascade-delete with it.
            inbound_derivations = c.execute(
                "SELECT COUNT(*) FROM derivations WHERE core_root = ?",
                (document_root_value,),
            ).fetchone()[0]
            now = int(_time.time())
            with transaction(c):
                for cr in c.execute(
                    "SELECT chunk_id FROM chunks WHERE document_root = ?",
                    (document_root_value,),
                ).fetchall():
                    c.execute(
                        "DELETE FROM chunks_fts WHERE rowid = ?",
                        (cr["chunk_id"],),
                    )
                c.execute(
                    "DELETE FROM edges WHERE src_root = ?",
                    (document_root_value,),
                )
                c.execute(
                    "DELETE FROM documents WHERE document_root = ?",
                    (document_root_value,),
                )
            event_hash = append_audit(
                c,
                event_type="core_burn",
                subject_root=document_root_value,
                body={
                    "document_root": document_root_value,
                    "document_uri": row["document_uri"],
                    "kind": "core",
                    "title": row["title"],
                    "source_type": row["source_type"],
                    "compression_depth": int(row["compression_depth"]),
                    "burned_chunk_count": int(chunk_count),
                    "burned_inbound_derivations": int(inbound_derivations),
                    "child_counts_at_burn": counts,
                    "reason": reason,
                    "by_actor": by_actor,
                    "forced": bool(total_children > 0 and force),
                },
                ts=now,
            )
            return {
                "status": "burned",
                "document_root": document_root_value,
                "kind": "core",
                "shard": sp.name,
                "burned_chunk_count": int(chunk_count),
                "burned_inbound_derivations": int(inbound_derivations),
                "child_counts_at_burn": counts,
                "reason": reason,
                "by_actor": by_actor,
                "audit_event_hash": event_hash,
                "ts": now,
            }
        finally:
            c.close()

    return {"status": "not_found", "document_root": document_root_value, "kind": "core"}


def _cmd_burn_kindergarten(args: argparse.Namespace) -> int:
    """Burn all providence_cache rows younger than the kindergarten window.

    Test-ergonomic mass burn: when iterating on retrieval/verifier knobs
    you want to wipe recent test runs without finding each cache_key.
    Mirrors the kindergarten window from `mesh sync` so what's still
    "private" (un-broadcast) is also what's safe to bust without
    confusing peers.

    Each row goes through the standard `_burn_cache_key` so the
    children gate is honored (use `--force` to override en masse).
    Each successful burn writes one ``providence_burn`` audit event;
    chain integrity is verifiable via `make chain-check-shards` after.
    """
    import time as _time
    from arborist.store import discover_shards

    now = int(_time.time())
    # `kindergarten_seconds <= 0` means "no time gate — burn every live
    # row" per the verb's docstring. The earlier `cutoff = now - 0`
    # treated 0 as a same-second-only window, which made
    # test_burn_kindergarten_zero_seconds_burns_everything timing-flaky:
    # if the wall-clock second rolled over between seed and burn,
    # cutoff > seed.created_at and nothing matched.
    kindergarten_seconds = max(0, args.kindergarten_seconds)
    no_time_gate = kindergarten_seconds == 0
    cutoff = 0 if no_time_gate else now - kindergarten_seconds
    shards_dir = Path(args.global_shards_dir) if args.global_shards_dir else None
    single_db = Path(args.db) if args.db else None
    paths: list[Path] = (
        discover_shards(shards_dir) if shards_dir else [single_db]
    )
    actor = args.by_actor or os.environ.get("USER", "unknown")
    reason = args.reason or f"burn-kindergarten window={args.kindergarten_seconds}s"

    examined = 0
    burned = 0
    refused = 0
    not_found = 0
    items: list[dict] = []
    for sp in paths:
        c = connect(sp)
        try:
            if no_time_gate:
                rows = c.execute(
                    "SELECT cache_key, created_at, audit_mode "
                    "FROM providence_cache "
                    "WHERE falsification_state = 'live' "
                    "ORDER BY created_at DESC"
                ).fetchall()
            else:
                rows = c.execute(
                    "SELECT cache_key, created_at, audit_mode "
                    "FROM providence_cache "
                    "WHERE created_at >= ? AND falsification_state = 'live' "
                    "ORDER BY created_at DESC",
                    (cutoff,),
                ).fetchall()
        finally:
            c.close()
        for r in rows:
            examined += 1
            if args.dry_run:
                items.append({
                    "cache_key": r["cache_key"],
                    "audit_mode": r["audit_mode"],
                    "created_at": r["created_at"],
                    "would_burn": True,
                })
                continue
            result = _burn_cache_key(
                r["cache_key"],
                reason=reason,
                by_actor=actor,
                shards_dir=shards_dir,
                db_path=single_db,
                force=bool(args.force),
            )
            status = result.get("status")
            if status == "burned":
                burned += 1
            elif status == "refused_has_children":
                refused += 1
            else:
                not_found += 1
            items.append(result)

    print(json.dumps({
        "status": "dry_run" if args.dry_run else "burned",
        "kindergarten_seconds": args.kindergarten_seconds,
        "cutoff_at": cutoff,
        "now": now,
        "examined": examined,
        "burned": burned,
        "refused_has_children": refused,
        "not_found": not_found,
        "items": items[: args.verbose],
    }, indent=2, ensure_ascii=False))
    return 0


def _cmd_burn(args: argparse.Namespace) -> int:
    """CLI: burn a leaf with no children.

    Dispatches on ``--kind`` to the matching helper. Default 'providence'
    preserves the original surface (`--cache-key` only) so existing scripts
    keep working. Document/core kinds use ``--root``.
    """
    kind = getattr(args, "kind", "providence") or "providence"
    shards = Path(args.global_shards_dir) if args.global_shards_dir else None
    db = Path(args.db) if args.db else None
    actor = args.by_actor or os.environ.get("USER", "unknown")
    reason = args.reason or ""
    force = bool(args.force)

    if kind == "providence":
        if not getattr(args, "cache_key", None):
            print("burn --kind providence requires --cache-key", file=sys.stderr)
            return 2
        result = _burn_cache_key(
            args.cache_key,
            reason=reason,
            by_actor=actor,
            shards_dir=shards,
            db_path=db,
            force=force,
        )
    elif kind in ("document", "core"):
        if not getattr(args, "root", None):
            print(f"burn --kind {kind} requires --root", file=sys.stderr)
            return 2
        helper = _burn_document_root if kind == "document" else _burn_core_root
        result = helper(
            args.root,
            reason=reason,
            by_actor=actor,
            shards_dir=shards,
            db_path=db,
            force=force,
        )
    else:
        print(f"unknown burn kind: {kind}", file=sys.stderr)
        return 2

    print(json.dumps(result, indent=2, ensure_ascii=False))
    return 0 if result.get("status") == "burned" else 1


def _cmd_providence(args: argparse.Namespace) -> int:
    """List providence_cache records or falsify one by cache_key."""
    if getattr(args, "falsify", None):
        result = _falsify_cache_key(
            args.falsify,
            state=args.state,
            reason=args.reason or "",
            by_actor=args.by_actor or os.environ.get("USER", "unknown"),
            shards_dir=Path(args.global_shards_dir) if args.global_shards_dir else None,
            db_path=Path(args.db) if args.db else None,
        )
        print(json.dumps(result, indent=2, ensure_ascii=False))
        return 0 if result.get("status") == "falsified" else 1
    if getattr(args, "show_preflight", None):
        # Ticket #000009 §7.2 — pull the preflight stage payload
        # from a row's run_dag_blob. Operator tool for inspecting
        # the policy state that governed the cached row.
        return _cmd_providence_show_preflight(
            cache_key_prefix=args.show_preflight,
            shards_dir=args.global_shards_dir,
            db=args.db,
        )
    conn = (
        connect_query(args.db, shards_dir=args.global_shards_dir)
        if args.global_shards_dir
        else connect(args.db)
    )
    try:
        if args.document_uri:
            rows = conn.execute(
                "SELECT cache_key, question_text, answer_text, falsification_state, "
                " hit_count, created_at FROM providence_cache "
                "WHERE document_uri = ? ORDER BY created_at DESC",
                (args.document_uri,),
            ).fetchall()
        elif args.source_root:
            rows = conn.execute(
                "SELECT cache_key, question_text, answer_text, falsification_state, "
                " hit_count, created_at FROM providence_cache "
                "WHERE source_root = ? ORDER BY created_at DESC",
                (args.source_root,),
            ).fetchall()
        else:
            rows = conn.execute(
                "SELECT cache_key, question_text, answer_text, falsification_state, "
                " hit_count, created_at FROM providence_cache "
                "ORDER BY created_at DESC LIMIT ?",
                (args.limit,),
            ).fetchall()
    finally:
        conn.close()
    out = [
        {
            "cache_key": r["cache_key"],
            "state": r["falsification_state"],
            "hit_count": r["hit_count"],
            "question": r["question_text"],
            "answer": r["answer_text"],
            "created_at": r["created_at"],
        }
        for r in rows
    ]
    print(json.dumps(out, indent=2, ensure_ascii=False))
    return 0


def _cmd_providence_show_preflight(
    *,
    cache_key_prefix: str,
    shards_dir: str | None,
    db: str | None,
) -> int:
    """Render the ``preflight`` stage payload for a cached row.

    Ticket #000009 §7.2. Pulls ``run_dag_blob`` for the matching
    cache row, parses the JSON, finds the ``preflight`` node, and
    pretty-prints the five nested CTI clauses (classifier,
    answer_contract, prompt_contract, evidence_contract,
    policy_refs) plus the metacognition QuestionState.

    Match is by 12-char prefix on ``cache_key`` (matches what
    bench rows + `_render_query_human` already truncate to).
    Returns 0 on success, 1 on miss / parse failure.
    """
    conn = (
        connect_query(db, shards_dir=shards_dir)
        if shards_dir
        else connect(db)
    )
    try:
        rows = conn.execute(
            "SELECT cache_key, question_text, run_dag_blob "
            "FROM providence_cache WHERE cache_key LIKE ? "
            "ORDER BY created_at DESC LIMIT 5",
            (cache_key_prefix + "%",),
        ).fetchall()
    finally:
        conn.close()
    if not rows:
        print(
            f"  no providence_cache row matching cache_key prefix "
            f"'{cache_key_prefix}'", file=sys.stderr,
        )
        return 1
    if len(rows) > 1:
        print(
            f"  {len(rows)} rows match prefix '{cache_key_prefix}'; "
            "rendering most recent. Pass a longer prefix to disambiguate.",
            file=sys.stderr,
        )
    row = rows[0]
    blob = row["run_dag_blob"]
    if not blob:
        print(
            f"  cache_key {row['cache_key'][:12]}: no run_dag_blob "
            "(legacy row, predates #000009)",
            file=sys.stderr,
        )
        return 1
    try:
        parsed = json.loads(blob)
    except json.JSONDecodeError as exc:
        print(f"  run_dag_blob parse error: {exc}", file=sys.stderr)
        return 1
    nodes = parsed.get("nodes") or []
    preflight_node = next(
        (n for n in nodes if isinstance(n, dict)
         and n.get("stage") == "preflight"),
        None,
    )
    if preflight_node is None:
        print(
            f"  cache_key {row['cache_key'][:12]}: run_dag has no "
            "preflight stage (predates #000009 binding)",
            file=sys.stderr,
        )
        return 1
    # Pull the full preflight payload (Ticket #000009 §7.2 — payload
    # now persisted alongside nodes via build_run_dag's
    # preflight_payload kwarg). Fall back to hash-only render for
    # legacy rows whose blob predates the payload-storage commit.
    payload = parsed.get("preflight_payload")
    out: dict = {
        "cache_key": row["cache_key"][:12],
        "question": row["question_text"],
        "preflight_stage_hash": preflight_node.get("hash"),
        "preflight_hash_12": (preflight_node.get("hash") or "")[:12],
        "run_dag_root": parsed.get("root"),
        "run_dag_stages": [n.get("stage") for n in nodes],
    }
    if payload is not None:
        # Verify the persisted payload hashes to the persisted leaf.
        # Mismatch would indicate post-write tampering or a serialization
        # drift; surface it explicitly so an auditor can detect.
        from arborist.qa.dag import _canonical_json, _sha256_hex
        recomputed = _sha256_hex(_canonical_json(payload))
        out["preflight_payload"] = payload
        out["payload_hash_check"] = (
            "ok" if recomputed == preflight_node.get("hash")
            else f"MISMATCH (recomputed {recomputed[:12]} != stored {preflight_node.get('hash', '')[:12]})"
        )
    else:
        out["preflight_payload"] = None
        out["payload_hash_check"] = (
            "unavailable: legacy row predates preflight_payload "
            "persistence (Ticket #000009 §7.2)"
        )
    print(json.dumps(out, indent=2, ensure_ascii=False))
    return 0


def _cmd_controller_events(args: argparse.Namespace) -> int:
    """List ``controller_events`` rows from one shard or every shard.

    Read-only inspector for the Phase 2 advisory writes
    (``arborist.qa.runner._emit_qa_controller_advisory``). Surfaces
    decision / difficulty / budget_allocation rows so operators can
    measure the Retrigger 1 signal (#000045 §3) without raw SQL.
    """
    import sqlite3 as _sqlite3
    import time as _time

    shards_dir = args.global_shards_dir
    db_path = args.db
    if shards_dir:
        shard_paths = sorted(Path(shards_dir).glob("*.db"))
    elif db_path:
        shard_paths = [Path(db_path)]
    else:
        shard_paths = [Path(DEFAULT_DB_PATH)]

    where: list[str] = []
    params: list = []
    if args.kind:
        where.append("event_kind = ?")
        params.append(args.kind)
    if args.organism_prefix:
        where.append("organism_root LIKE ?")
        params.append(args.organism_prefix + "%")
    if args.since_seconds is not None:
        where.append("recorded_at >= ?")
        params.append(int(_time.time()) - args.since_seconds)
    where_sql = (" WHERE " + " AND ".join(where)) if where else ""
    sql = (
        "SELECT event_id, organism_root, branch_id, event_kind, label,"
        " entropy, difficulty, allocation, body_blob, recorded_at"
        " FROM controller_events"
        + where_sql
        + " ORDER BY recorded_at DESC, event_id DESC LIMIT ?"
    )

    out: list[dict] = []
    summary: dict[str, int] = {}
    remaining = args.limit
    for sp in shard_paths:
        if remaining <= 0:
            break
        if sp.suffix != ".db" or "-shm" in sp.name or "-wal" in sp.name:
            continue
        try:
            conn = _sqlite3.connect(f"file:{sp}?mode=ro", uri=True)
        except _sqlite3.OperationalError:
            continue
        conn.row_factory = _sqlite3.Row
        has_table = conn.execute(
            "SELECT name FROM sqlite_master "
            "WHERE type='table' AND name='controller_events'"
        ).fetchone()
        if not has_table:
            conn.close()
            continue
        try:
            rows = conn.execute(sql, [*params, remaining]).fetchall()
        except _sqlite3.OperationalError:
            conn.close()
            continue
        conn.close()
        for r in rows:
            kind = r["event_kind"]
            summary[kind] = summary.get(kind, 0) + 1
            entry = {
                "shard": sp.name,
                "event_id": r["event_id"],
                "kind": kind,
                "organism_root": r["organism_root"],
                "branch_id": r["branch_id"],
                "label": r["label"],
                "entropy": r["entropy"],
                "difficulty": r["difficulty"],
                "allocation": r["allocation"],
                "recorded_at": r["recorded_at"],
                "recorded_at_iso": _time.strftime(
                    "%Y-%m-%dT%H:%M:%SZ", _time.gmtime(r["recorded_at"])
                ),
            }
            if args.body:
                try:
                    entry["body"] = json.loads(r["body_blob"])
                except (TypeError, ValueError):
                    entry["body"] = r["body_blob"]
            out.append(entry)
        remaining = args.limit - len(out)

    if args.json:
        print(json.dumps(
            {"summary": summary, "rows": out},
            indent=2, ensure_ascii=False,
        ))
        return 0

    if not out:
        print("(no controller_events rows matched)")
        return 0
    print(f"# controller_events — {len(out)} row(s) across "
          f"{len({e['shard'] for e in out})} shard(s)")
    for kind, n in sorted(summary.items(), key=lambda kv: -kv[1]):
        print(f"#   {kind}: {n}")
    print()
    print(
        f"{'shard':<10} {'kind':<29} {'label':<10} "
        f"{'diff':>6} {'alloc':>6} {'recorded_at_iso'}  organism"
    )
    for e in out:
        org = (e["organism_root"] or "")[:48]
        diff = "-" if e["difficulty"] is None else f"{e['difficulty']:.2f}"
        alloc = "-" if e["allocation"] is None else f"{e['allocation']:.2f}"
        print(
            f"{e['shard']:<10} {e['kind']:<29} "
            f"{(e['label'] or '-'):<10} {diff:>6} {alloc:>6} "
            f"{e['recorded_at_iso']}  {org}"
        )
    return 0


def _load_record_context(row, shards_dir, qa_db):
    """Reassemble context for a providence record. Returns text or None
    if any source doc has no hot chunks (cold)."""
    from arborist.qa.query import _load_doc_text

    proof = json.loads(row["merkle_proof"])
    sources = proof.get("sources", [])
    if not sources:
        return None
    parts: list[str] = []
    for src in sources:
        shard_name = src.get("shard")
        if not shard_name:
            return None
        if shards_dir:
            shard_path = shards_dir / shard_name
        else:
            shard_path = qa_db.parent / shard_name
        if not shard_path.exists():
            return None
        text = _load_doc_text(str(shard_path), src["document_root"])
        if not text:
            return None
        parts.append(text)
    return "\n\n".join(parts)


def _cmd_reclassify(args: argparse.Namespace) -> int:
    """Re-run the layered verifier against existing live providence records.

    Reads each record's answer + reassembles its context from
    merkle_proof.sources, runs verify_quotes(), and updates the row only
    if the verdict differs from what's stored. No LLM calls — this just
    relabels existing answers under the current verifier.

    Cold-source records (where any source doc has no hot chunks) are
    skipped: we can't faithfully reclassify without the original context.
    Run `arborist rehydrate` first if you want those covered too.

    `--compare` runs all four entity policies side-by-side without
    writing — use it to see what each policy would produce on real data
    before committing to one. `--entity-policy X` writes under a single
    policy.

    Each changed record gets one 'providence_reclassify' audit event
    with old & new state for chain-of-custody.
    """
    import time
    from collections import defaultdict

    from arborist.qa.verify import (
        DEFAULT_ENTITY_POLICY,
        ENTITY_POLICIES,
        verify_quotes,
    )

    qa_db = args.qa_db
    if qa_db is None:
        if args.global_shards_dir:
            qa_db = Path(args.global_shards_dir) / "qa.db"
        else:
            qa_db = Path.home() / ".arborist" / "qa.db"
    qa_db = Path(qa_db)

    shards_dir = (
        Path(args.global_shards_dir) if args.global_shards_dir else None
    )

    conn = connect(qa_db)
    try:
        sql = (
            "SELECT cache_key, answer_text, merkle_proof, audit_mode, "
            " verifier_method, n_quotes, n_verified, unverified_quotes, "
            " question_text "
            "FROM providence_cache "
            "WHERE falsification_state = 'live' "
            "ORDER BY created_at DESC"
        )
        if args.limit:
            sql += f" LIMIT {int(args.limit)}"
        rows = conn.execute(sql).fetchall()

        if args.compare:
            # Run all four policies side-by-side, no DB write. Output is a
            # per-record grid + a per-policy distribution summary so fox can
            # eyeball where the policies disagree.
            grid = []
            distribution: dict[str, dict[str, int]] = {
                p: defaultdict(int) for p in ENTITY_POLICIES
            }
            skipped_cold = 0
            for row in rows:
                context = _load_record_context(row, shards_dir, qa_db)
                if context is None:
                    skipped_cold += 1
                    continue
                per_policy = {}
                for p in ENTITY_POLICIES:
                    v = verify_quotes(row["answer_text"], context, entity_policy=p)
                    label = f"{v['audit_mode']}/{v['verifier_method']}"
                    per_policy[p] = label
                    distribution[p][label] += 1
                grid.append({
                    "cache_key": row["cache_key"][:16] + "…",
                    "question": row["question_text"][:55],
                    **per_policy,
                })
            print(json.dumps({
                "examined": len(grid),
                "skipped_cold": skipped_cold,
                "distribution": {p: dict(d) for p, d in distribution.items()},
                "records": grid,
            }, indent=2, ensure_ascii=False))
            return 0

        # Single-policy reclassify. Default tracks DEFAULT_ENTITY_POLICY
        # so the CLI always matches the verifier's current contract.
        policy_name = args.entity_policy or DEFAULT_ENTITY_POLICY
        if policy_name not in ENTITY_POLICIES:
            print(
                f"--entity-policy must be one of {ENTITY_POLICIES}, "
                f"got {policy_name!r}",
                file=sys.stderr,
            )
            return 2

        summary = {
            "examined": 0,
            "changed": 0,
            "skipped_cold": 0,
            "unchanged": 0,
            "entity_policy": policy_name,
            "transitions": defaultdict(int),
        }

        for row in rows:
            summary["examined"] += 1
            context = _load_record_context(row, shards_dir, qa_db)
            if context is None:
                summary["skipped_cold"] += 1
                continue

            verdict = verify_quotes(
                row["answer_text"], context, entity_policy=policy_name
            )

            old_unverified = row["unverified_quotes"] or "null"
            new_unverified_blob = (
                json.dumps(verdict["unverified_quotes"], separators=(",", ":"))
                if verdict["unverified_quotes"]
                else None
            )
            new_unverified_for_compare = new_unverified_blob or "null"

            unchanged = (
                verdict["audit_mode"] == row["audit_mode"]
                and verdict["verifier_method"] == row["verifier_method"]
                and verdict["n_quotes"] == row["n_quotes"]
                and verdict["n_verified"] == row["n_verified"]
                and old_unverified == new_unverified_for_compare
            )
            if unchanged:
                summary["unchanged"] += 1
                continue

            summary["changed"] += 1
            transition = (
                f"{row['audit_mode']}/{row['verifier_method']} "
                f"-> {verdict['audit_mode']}/{verdict['verifier_method']}"
            )
            summary["transitions"][transition] += 1

            if args.dry_run:
                continue

            now = int(time.time())
            with transaction(conn):
                event_hash = append_audit(
                    conn,
                    event_type="providence_reclassify",
                    subject_root=row["cache_key"],
                    body={
                        "old_audit_mode": row["audit_mode"],
                        "new_audit_mode": verdict["audit_mode"],
                        "old_method": row["verifier_method"],
                        "new_method": verdict["verifier_method"],
                        "old_n_verified": row["n_verified"],
                        "new_n_verified": verdict["n_verified"],
                        "entity_policy": policy_name,
                    },
                    ts=now,
                )
                conn.execute(
                    "UPDATE providence_cache SET "
                    " audit_mode = ?, n_quotes = ?, n_verified = ?, "
                    " unverified_quotes = ?, verifier_method = ?, "
                    " audit_event_hash = ? "
                    "WHERE cache_key = ?",
                    (
                        verdict["audit_mode"],
                        verdict["n_quotes"],
                        verdict["n_verified"],
                        new_unverified_blob,
                        verdict["verifier_method"],
                        event_hash,
                        row["cache_key"],
                    ),
                )
    finally:
        conn.close()

    summary["transitions"] = dict(summary["transitions"])
    summary["dry_run"] = bool(args.dry_run)
    print(json.dumps(summary, indent=2, ensure_ascii=False))
    return 0


def _cmd_emergent(args: argparse.Namespace) -> int:
    """Surface emergent claims from UNGROUNDED/HYBRID providence records.

    These are spans the model produced that don't appear verbatim in the
    corpus — candidate ingest targets. Frequent unverified quotes signal
    knowledge the model has from training that our corpus is missing.
    """
    conn = (
        connect_query(args.db, shards_dir=args.global_shards_dir)
        if args.global_shards_dir
        else connect(args.db)
    )
    try:
        if args.aggregate:
            rows = conn.execute(
                "SELECT unverified_quotes FROM providence_cache "
                "WHERE audit_mode IN ('UNGROUNDED','HYBRID') "
                " AND falsification_state = 'live' "
                " AND unverified_quotes IS NOT NULL"
            ).fetchall()
            counts: dict[str, int] = {}
            for r in rows:
                for q in json.loads(r["unverified_quotes"]):
                    counts[q] = counts.get(q, 0) + 1
            ranked = sorted(counts.items(), key=lambda kv: -kv[1])[: args.limit]
            print(json.dumps(
                [{"quote": q, "count": c} for q, c in ranked],
                indent=2, ensure_ascii=False
            ))
        else:
            rows = conn.execute(
                "SELECT cache_key, audit_mode, verifier_method, question_text, "
                " n_quotes, n_verified, unverified_quotes, created_at "
                "FROM providence_cache "
                "WHERE audit_mode IN ('UNGROUNDED','HYBRID') "
                " AND falsification_state = 'live' "
                "ORDER BY created_at DESC LIMIT ?",
                (args.limit,),
            ).fetchall()
            out = [
                {
                    "cache_key": r["cache_key"],
                    "audit_mode": r["audit_mode"],
                    "verifier_method": r["verifier_method"],
                    "question": r["question_text"],
                    "n_quotes": r["n_quotes"],
                    "n_verified": r["n_verified"],
                    "unverified_quotes": (
                        json.loads(r["unverified_quotes"])
                        if r["unverified_quotes"]
                        else []
                    ),
                    "created_at": r["created_at"],
                }
                for r in rows
            ]
            print(json.dumps(out, indent=2, ensure_ascii=False))
    finally:
        conn.close()
    return 0


def _cmd_evict(args: argparse.Namespace) -> int:
    """Demote surface chunks from hot to cold (NULL content). Cores never evict."""
    from arborist.evict import evict_to_cold

    conn = (
        connect_query(args.db, shards_dir=args.global_shards_dir)
        if args.global_shards_dir
        else connect(args.db)
    )
    try:
        result = evict_to_cold(
            conn,
            source_type=args.source_type,
            older_than_days=args.older_than_days,
            document_roots=args.document_root or None,
        )
    finally:
        conn.close()
    print(json.dumps(result, indent=2, ensure_ascii=False))
    return 0


def _cmd_rehydrate(args: argparse.Namespace) -> int:
    """Rehydrate cold chunks from source; non-zero exit if drift detected."""
    from arborist.evict import rehydrate

    conn = (
        connect_query(args.db, shards_dir=args.global_shards_dir)
        if args.global_shards_dir
        else connect(args.db)
    )
    try:
        if args.all_cold:
            roots = [
                r["document_root"]
                for r in conn.execute(
                    "SELECT DISTINCT document_root FROM chunks WHERE tier = 'cold'"
                ).fetchall()
            ]
        else:
            roots = list(args.document_root or [])
        if not roots:
            print(
                "rehydrate needs --document-root R or --all-cold",
                file=sys.stderr,
            )
            return 2
        results = []
        for r in roots:
            res = rehydrate(conn, r)
            res["document_root"] = r
            results.append(res)
    finally:
        conn.close()
    print(json.dumps(results, indent=2, ensure_ascii=False))
    drift = sum(1 for r in results if r.get("status") == "drift_detected")
    return 1 if drift else 0


def _cmd_activity(args: argparse.Namespace) -> int:
    """Recent activity: Q&A records + freshly cached docs across all shards.

    Designed for an agent to read before deciding the next action — what was
    just asked, what was just integrated, what's the corpus state.
    """
    import time as _time

    from arborist.store import discover_shards

    shard_paths: list[Path] = []
    if args.global_shards_dir:
        shard_paths = discover_shards(args.global_shards_dir)
    else:
        shard_paths = [Path(args.db)]

    cutoff_ts = 0
    if args.since_seconds:
        cutoff_ts = int(_time.time()) - args.since_seconds

    qa_records: list[dict] = []
    ingest_events: list[dict] = []
    derive_events: list[dict] = []
    falsifications: list[dict] = []
    corpus = {
        "documents_total": 0,
        "documents_surface": 0,
        "documents_core": 0,
        "providence_total": 0,
        "providence_live": 0,
        "providence_stale": 0,
        "providence_failed": 0,
        "audit_events_total": 0,
    }

    for sp in shard_paths:
        c = connect(sp)
        try:
            corpus["documents_total"] += c.execute(
                "SELECT COUNT(*) FROM documents"
            ).fetchone()[0]
            corpus["documents_surface"] += c.execute(
                "SELECT COUNT(*) FROM documents WHERE kind='surface'"
            ).fetchone()[0]
            corpus["documents_core"] += c.execute(
                "SELECT COUNT(*) FROM documents WHERE kind='core'"
            ).fetchone()[0]
            corpus["providence_total"] += c.execute(
                "SELECT COUNT(*) FROM providence_cache"
            ).fetchone()[0]
            corpus["providence_live"] += c.execute(
                "SELECT COUNT(*) FROM providence_cache WHERE falsification_state='live'"
            ).fetchone()[0]
            corpus["providence_stale"] += c.execute(
                "SELECT COUNT(*) FROM providence_cache WHERE falsification_state='stale'"
            ).fetchone()[0]
            corpus["providence_failed"] += c.execute(
                "SELECT COUNT(*) FROM providence_cache WHERE falsification_state='failed'"
            ).fetchone()[0]
            corpus["audit_events_total"] += c.execute(
                "SELECT COUNT(*) FROM audit_events"
            ).fetchone()[0]

            # Q&A records
            for r in c.execute(
                "SELECT cache_key, question_text, answer_text, "
                " falsification_state, hit_count, created_at, last_hit_at, "
                " document_uri FROM providence_cache "
                "WHERE created_at >= ? ORDER BY created_at DESC LIMIT ?",
                (cutoff_ts, args.limit),
            ).fetchall():
                ans = r["answer_text"] or ""
                qa_records.append(
                    {
                        "ts": r["created_at"],
                        "shard": sp.name,
                        "cache_key": r["cache_key"],
                        "question": r["question_text"],
                        "answer_preview": (
                            ans if len(ans) <= args.preview_chars
                            else ans[: args.preview_chars] + "…"
                        ),
                        "sources_uri": r["document_uri"],
                        "state": r["falsification_state"],
                        "hit_count": r["hit_count"],
                        "last_hit_at": r["last_hit_at"],
                    }
                )

            # Recent ingest events
            for r in c.execute(
                "SELECT subject_root, body, ts FROM audit_events "
                "WHERE event_type = 'ingest' AND ts >= ? "
                "ORDER BY ts DESC LIMIT ?",
                (cutoff_ts, args.limit),
            ).fetchall():
                body = json.loads(r["body"]) if r["body"] else {}
                ingest_events.append(
                    {
                        "ts": r["ts"],
                        "shard": sp.name,
                        "document_root": r["subject_root"],
                        "document_uri": body.get("document_uri"),
                        "source_type": body.get("source_type"),
                        "chunks": body.get("chunks"),
                        "supersedes": body.get("supersedes"),
                    }
                )

            # Recent derive events (distillations)
            for r in c.execute(
                "SELECT subject_root, body, ts FROM audit_events "
                "WHERE event_type = 'derive' AND ts >= ? "
                "ORDER BY ts DESC LIMIT ?",
                (cutoff_ts, args.limit),
            ).fetchall():
                body = json.loads(r["body"]) if r["body"] else {}
                derive_events.append(
                    {
                        "ts": r["ts"],
                        "shard": sp.name,
                        "core_root": r["subject_root"],
                        "src_root": body.get("src_root"),
                        "process_id": body.get("process_id"),
                        "compression_ratio": body.get("compression_ratio"),
                        "compression_depth": body.get("compression_depth"),
                    }
                )

            # Recent falsifications
            for r in c.execute(
                "SELECT cache_key, state, reason, by_actor, at FROM falsifications "
                "WHERE at >= ? ORDER BY at DESC LIMIT ?",
                (cutoff_ts, args.limit),
            ).fetchall():
                falsifications.append(
                    {
                        "ts": r["at"],
                        "shard": sp.name,
                        "cache_key": r["cache_key"],
                        "state": r["state"],
                        "reason": r["reason"],
                        "by_actor": r["by_actor"],
                    }
                )
        finally:
            c.close()

    qa_records.sort(key=lambda x: -x["ts"])
    ingest_events.sort(key=lambda x: -x["ts"])
    derive_events.sort(key=lambda x: -x["ts"])
    falsifications.sort(key=lambda x: -x["ts"])

    print(
        json.dumps(
            {
                "as_of": int(_time.time()),
                "shards": [str(p) for p in shard_paths],
                "corpus": corpus,
                "recent_qa": qa_records[: args.limit],
                "recent_ingests": ingest_events[: args.limit],
                "recent_derives": derive_events[: args.limit],
                "recent_falsifications": falsifications[: args.limit],
            },
            indent=2, ensure_ascii=False
        )
    )
    return 0


def _cmd_canon(args: argparse.Namespace) -> int:
    """Direct π* canonicalization — no shards, no LLM, no audit chain.

    Two modes:
      ``arborist canon --list``                — print registry contents.
      ``arborist canon <key> "<input>"``       — canonicalize and print.
    """
    import hashlib

    from arborist.pi_star import PiStarError, get, list_keys

    if args.list:
        from arborist.pi_star import REGISTRY
        for key in sorted(list_keys()):
            ps = REGISTRY[key]
            print(f"{key}\t{ps.domain}")
        return 0

    if not args.key or args.input is None:
        print(
            "error: KEY and INPUT required (or --list to see registry)",
            file=sys.stderr,
        )
        return 2

    try:
        ps = get(args.key)
    except KeyError:
        print(f"error: unknown π*: {args.key!r}", file=sys.stderr)
        print(
            f"available: {', '.join(sorted(list_keys()))}",
            file=sys.stderr,
        )
        return 2

    try:
        canonical = ps.canonicalize(args.input.encode("utf-8"))
    except PiStarError as exc:
        print(f"error: {exc}", file=sys.stderr)
        return 1
    except NotImplementedError as exc:
        print(f"error: {args.key} not yet implemented ({exc})", file=sys.stderr)
        return 1

    if args.json:
        digest = hashlib.sha256(canonical).hexdigest()
        print(json.dumps({
            "pi_star_ref": args.key,
            "input": args.input,
            "canonical": canonical.decode("utf-8", errors="replace"),
            "canonical_sha256": digest,
        }, ensure_ascii=False))
    else:
        sys.stdout.buffer.write(canonical)
        sys.stdout.buffer.write(b"\n")
    return 0


def _cmd_stats(args: argparse.Namespace) -> int:
    """Counts: documents, chunks (by tier), edges, providence, audit events."""
    conn = (
        connect_query(args.db, shards_dir=args.global_shards_dir)
        if args.global_shards_dir
        else connect(args.db)
    )
    try:
        result = stats(conn)
    finally:
        conn.close()
    print(json.dumps(result, indent=2, ensure_ascii=False))
    return 0


def _check_audit_chain(conn: sqlite3.Connection) -> tuple[int, int]:
    """Return (events_checked, breaks) for one audit chain in `conn`."""
    import hashlib

    rows = conn.execute(
        "SELECT seq, event_hash, prev_event_hash, body FROM audit_events ORDER BY seq"
    ).fetchall()
    prev = None
    breaks = 0
    for r in rows:
        h = hashlib.sha256()
        if r["prev_event_hash"]:
            h.update(bytes.fromhex(r["prev_event_hash"]))
        h.update(r["body"].encode("utf-8"))
        if h.hexdigest() != r["event_hash"]:
            breaks += 1
        if r["prev_event_hash"] != prev:
            breaks += 1
        prev = r["event_hash"]
    return len(rows), breaks


def _cmd_analyze(args: argparse.Namespace) -> int:
    """Compression spectrum, depth distribution, audit chain integrity."""
    from arborist.store import discover_shards

    # In sharded mode, audit chains live per-shard (each shard has its own
    # genesis -> latest). Check each independently and aggregate.
    audit_summary: dict | None = None
    if args.global_shards_dir:
        per_shard_chain = []
        total_events = 0
        total_breaks = 0
        for sp in discover_shards(args.global_shards_dir):
            # Read-only: an audit-chain walk must not run migration DDL
            # or take a write lock on the shard (see connect_readonly).
            sc = connect_readonly(sp)
            try:
                ev, br = _check_audit_chain(sc)
            finally:
                sc.close()
            per_shard_chain.append({"shard": sp.name, "events": ev, "breaks": br})
            total_events += ev
            total_breaks += br
        audit_summary = {
            "events": total_events,
            "breaks": total_breaks,
            "shards": per_shard_chain,
        }

    conn = (
        connect_query(args.db, shards_dir=args.global_shards_dir)
        if args.global_shards_dir
        else connect(args.db)
    )
    try:
        # Depth distribution.
        depth = conn.execute(
            "SELECT compression_depth, COUNT(*) AS n "
            "FROM documents GROUP BY compression_depth ORDER BY 1"
        ).fetchall()

        # Per-process compression ratios.
        procs = conn.execute(
            "SELECT json_extract(body, '$.process_id') AS process_id, "
            "       json_extract(body, '$.src_kind') AS src_kind, "
            "       AVG(CAST(json_extract(body, '$.compression_ratio') AS REAL)) AS mean_ratio, "
            "       MIN(CAST(json_extract(body, '$.compression_ratio') AS REAL)) AS min_ratio, "
            "       MAX(CAST(json_extract(body, '$.compression_ratio') AS REAL)) AS max_ratio, "
            "       COUNT(*) AS n_events "
            "FROM audit_events WHERE event_type='derive' "
            "GROUP BY process_id, src_kind"
        ).fetchall()

        # Source/kind crosstab.
        kinds = conn.execute(
            "SELECT source_type, kind, COUNT(*) AS n "
            "FROM documents GROUP BY source_type, kind ORDER BY 3 DESC"
        ).fetchall()

        # Tier distribution.
        tiers = conn.execute(
            "SELECT tier, COUNT(*) AS n FROM chunks GROUP BY tier"
        ).fetchall()

        # Top inbound link targets — the 'gravity wells' of the corpus,
        # counted per resolved destination document (dst_root), which the
        # idx_edges_dst_root partial index serves directly: an index-ordered
        # scan -> streaming GROUP BY, bounded memory. (Grouping by the raw
        # dst_uri link string instead has no index and hash-aggregates over
        # every target including red links -> unbounded RSS at corpus scale.)
        # Join documents only for the surviving top-N rows, for titles.
        gravity = conn.execute(
            "SELECT g.root AS root, g.inbound AS inbound, "
            "       d.document_uri AS uri, d.title AS title "
            "FROM (SELECT dst_root AS root, COUNT(*) AS inbound "
            "        FROM edges "
            "       WHERE edge_type='wikilink' AND dst_root <> '' "
            "       GROUP BY dst_root ORDER BY inbound DESC LIMIT ?) g "
            "LEFT JOIN documents d ON d.document_root = g.root "
            "ORDER BY g.inbound DESC",
            (args.gravity_top,),
        ).fetchall()

        # Audit chain integrity (per-shard if sharded; single chain otherwise).
        if audit_summary is None:
            ev, br = _check_audit_chain(conn)
            audit_summary = {"events": ev, "breaks": br}

        report = {
            "compression_depth_histogram": [
                {"depth": r["compression_depth"], "count": r["n"]} for r in depth
            ],
            "distillers": [
                {
                    "process_id": r["process_id"],
                    "src_kind": r["src_kind"],
                    "n_events": r["n_events"],
                    "compression_ratio": {
                        "mean": (
                            round(r["mean_ratio"], 4)
                            if r["mean_ratio"] is not None
                            else None
                        ),
                        "min": (
                            round(r["min_ratio"], 4)
                            if r["min_ratio"] is not None
                            else None
                        ),
                        "max": (
                            round(r["max_ratio"], 4)
                            if r["max_ratio"] is not None
                            else None
                        ),
                    },
                }
                for r in procs
            ],
            "documents_by_source_kind": [
                {"source_type": r["source_type"], "kind": r["kind"], "count": r["n"]}
                for r in kinds
            ],
            "chunks_by_tier": {r["tier"]: r["n"] for r in tiers},
            "audit_chain": audit_summary,
            "gravity_top_inbound": [
                {
                    "root": r["root"],
                    "uri": r["uri"],
                    "title": r["title"],
                    "inbound": r["inbound"],
                }
                for r in gravity
            ],
        }
    finally:
        conn.close()
    print(json.dumps(report, indent=2, ensure_ascii=False))
    return 0


def _cmd_snapshot_create(args: argparse.Namespace) -> int:
    """Compute snapshot_root over the read scope, persist into args.db.

    Single-DB mode (--db only): read + write are the same connection;
    delegate to the snapshot module's create_snapshot().

    Cross-shard mode (--shards-dir + --db): read against the in-memory
    UNION view to get the cluster-level Merkle root, then persist into
    args.db (a dedicated snapshots store, conventionally
    `~/.arborist/shards/snapshots.db`). The writer's own documents table
    is irrelevant to the snapshot value — only the union scope counts.
    """
    import time as _time

    from arborist.snapshot import compute_snapshot_root, create_snapshot

    if args.global_shards_dir is None:
        conn = connect(args.db)
        try:
            result = create_snapshot(
                conn, reason=args.reason, parent_snapshot=args.parent,
            )
        finally:
            conn.close()
        print(json.dumps(result, indent=2, ensure_ascii=False))
        return 0

    # Cross-shard: compute against UNION, write to args.db.
    read_conn = connect_query(args.db, shards_dir=args.global_shards_dir)
    try:
        snapshot_root, doc_count = compute_snapshot_root(read_conn)
    finally:
        read_conn.close()

    write_conn = connect(args.db)
    try:
        parent = args.parent
        if parent is None:
            row = write_conn.execute(
                "SELECT snapshot_root FROM snapshots ORDER BY taken_at DESC LIMIT 1"
            ).fetchone()
            if row is not None:
                parent = row["snapshot_root"]

        now = int(_time.time())
        body = {
            "snapshot_root": snapshot_root,
            "doc_count": doc_count,
            "parent_snapshot": parent,
            "reason": args.reason,
            "scope": "shards-union",
        }
        audit_event_hash = append_audit(
            write_conn,
            event_type="snapshot_create",
            body=body,
            subject_root=snapshot_root,
            ts=now,
        )
        with transaction(write_conn):
            write_conn.execute(
                "INSERT OR IGNORE INTO snapshots "
                "(snapshot_root, taken_at, audit_event_hash, doc_count, "
                " parent_snapshot, reason) VALUES (?, ?, ?, ?, ?, ?)",
                (
                    snapshot_root,
                    now,
                    audit_event_hash,
                    doc_count,
                    parent,
                    args.reason,
                ),
            )
    finally:
        write_conn.close()

    print(
        json.dumps(
            {
                "snapshot_root": snapshot_root,
                "doc_count": doc_count,
                "parent_snapshot": parent,
                "audit_event_hash": audit_event_hash,
                "taken_at": now,
                "reason": args.reason,
                "scope": "shards-union",
            },
            indent=2, ensure_ascii=False
        )
    )
    return 0


def _cmd_snapshot_list(args: argparse.Namespace) -> int:
    """List recent corpus snapshots (newest first, --limit N)."""
    from arborist.snapshot import list_snapshots

    conn = connect(args.db)
    try:
        rows = list_snapshots(conn, limit=args.limit)
    finally:
        conn.close()
    print(json.dumps(rows, indent=2, ensure_ascii=False))
    return 0


def _cmd_snapshot_verify(args: argparse.Namespace) -> int:
    """Re-derive snapshot root from current corpus; non-zero exit on drift."""
    from arborist.snapshot import verify_snapshot

    conn = (
        connect_query(args.db, shards_dir=args.global_shards_dir)
        if args.global_shards_dir
        else connect(args.db)
    )
    try:
        result = verify_snapshot(conn, args.snapshot_root)
    finally:
        conn.close()
    print(json.dumps(result, indent=2, ensure_ascii=False))
    return 0 if result["matches"] else 1


def _cmd_snapshot_diff(args: argparse.Namespace) -> int:
    from arborist.snapshot import diff_against_current

    conn = (
        connect_query(args.db, shards_dir=args.global_shards_dir)
        if args.global_shards_dir
        else connect(args.db)
    )
    try:
        result = diff_against_current(conn, args.snapshot_root)
    finally:
        conn.close()
    print(json.dumps(result, indent=2, ensure_ascii=False))
    return 0


def _cmd_substrate_score(args: argparse.Namespace) -> int:
    """Compute the substrate ForkScore over (parent, child) bench-result JSON files."""
    from arborist.substrate import (
        bench_result_to_metrics,
        fork_score,
    )
    from arborist.substrate.weights import DEFAULT_WEIGHTS, from_dict as weights_from_dict

    with open(args.parent, "r", encoding="utf-8") as fh:
        parent_payload = json.load(fh)
    with open(args.child, "r", encoding="utf-8") as fh:
        child_payload = json.load(fh)

    if args.weights:
        with open(args.weights, "r", encoding="utf-8") as fh:
            weights = weights_from_dict(json.load(fh))
    else:
        weights = DEFAULT_WEIGHTS

    parent_metrics = bench_result_to_metrics(parent_payload)
    child_metrics = bench_result_to_metrics(child_payload)

    scored = fork_score(
        parent_metrics,
        child_metrics,
        weights=weights,
        capital_delta=float(args.capital_delta),
        selfmodel_calibration_gain=float(args.selfmodel_calibration_gain),
        audit_completeness=float(args.audit_completeness),
        validator_diversity=float(args.validator_diversity),
        security_risk=float(args.security_risk),
        complexity_delta=float(args.complexity_delta),
        memory_invalidation_count=float(args.memory_invalidation_count),
    )
    artifact = scored.to_dict()
    artifact_json = json.dumps(
        artifact, indent=2, ensure_ascii=False, default=str
    )
    print(artifact_json)
    # Phase 1b — also write to disk so CI / mesh peers / downstream
    # graders can pick up the artifact without parsing stdout.
    out_path = getattr(args, "out", None)
    if out_path:
        from pathlib import Path
        p = Path(out_path)
        p.parent.mkdir(parents=True, exist_ok=True)
        p.write_text(artifact_json + "\n", encoding="utf-8")

    # Phase 1c — branch-set persistence (#000012). Default off:
    # writes only when --branch-set is present. --branch-id falls
    # back to --child-root so a single-flag CLI works for the common
    # case (one branch per child-root identity).
    branch_set_id = getattr(args, "branch_set", None)
    if branch_set_id:
        from arborist.substrate.fork_score import persist_branch_score

        parent_root = getattr(args, "parent_root", None)
        child_root = getattr(args, "child_root", None)
        branch_id = getattr(args, "branch_id", None) or child_root
        if not parent_root or not branch_id:
            sys.stderr.write(
                "--branch-set requires --parent-root + (--branch-id "
                "or --child-root)\n"
            )
            return 2
        persist_shard = getattr(args, "persist_shard", None) or args.db
        weights_id = getattr(args, "weights_id", None) or (
            "default" if not args.weights else Path(args.weights).stem
        )
        p_conn = connect(persist_shard)
        try:
            with transaction(p_conn):
                persist_branch_score(
                    p_conn,
                    branch_set_id=branch_set_id,
                    branch_id=branch_id,
                    parent_root=parent_root,
                    child_root=child_root,
                    scored=scored,
                    weights_id=weights_id,
                )
        finally:
            p_conn.close()

    # Non-zero exit on REJECT so CI can gate on it.
    return 0 if scored.verdict in ("ACCEPT", "MARGINAL") else 1


def _cmd_memory_snapshot(args: argparse.Namespace) -> int:
    """Build a memory snapshot from current store state and persist it."""
    from arborist.memory import snapshot, store_snapshot

    conn = connect(args.db)
    try:
        with transaction(conn):
            ms = snapshot(conn)
            root = store_snapshot(conn, ms)
    finally:
        conn.close()
    print(json.dumps({"memory_root": root}, indent=2, ensure_ascii=False))
    return 0


def _cmd_memory_show(args: argparse.Namespace) -> int:
    """Print a memory record by root, or the latest live one."""
    from arborist.memory import branches_for, latest, load

    conn = connect(args.db)
    try:
        if args.root:
            row = load(conn, args.root)
        else:
            row = latest(conn)
        if row is None:
            print(json.dumps({"error": "no memory snapshot found"}, indent=2))
            return 1
        body = row["branch_summaries_blob"]
        if isinstance(body, (bytes, bytearray)):
            body = body.decode("utf-8", errors="replace")
        out = {
            "memory_root": row["memory_root"],
            "state": row["state"],
            "schema_version": row["schema_version"],
            "parent_memory_root": row["parent_memory_root"],
            "audit_events_high_water": row["audit_events_high_water"],
            "audit_event_hash": row["audit_event_hash"],
            "created_at": row["created_at"],
            "falsified_at": row["falsified_at"],
            "falsified_reason": row["falsified_reason"],
            "body": body,
            "branches": branches_for(conn, row["memory_root"]),
        }
    finally:
        conn.close()
    print(json.dumps(out, indent=2, ensure_ascii=False, default=str))
    return 0


def _cmd_memory_branches(args: argparse.Namespace) -> int:
    """List branch summaries attached to a memory_root."""
    from arborist.memory import branches_for, latest

    conn = connect(args.db)
    try:
        if args.root:
            root = args.root
        else:
            row = latest(conn)
            if row is None:
                print(json.dumps([], indent=2))
                return 0
            root = row["memory_root"]
        out = branches_for(conn, root)
    finally:
        conn.close()

    # Decode summary_blob to text for human inspection.
    rendered = []
    for b in out:
        rec = dict(b)
        if isinstance(rec["summary_blob"], (bytes, bytearray)):
            rec["summary_blob"] = rec["summary_blob"].decode(
                "utf-8", errors="replace"
            )
        rendered.append(rec)
    print(json.dumps(rendered, indent=2, ensure_ascii=False, default=str))
    return 0


def _cmd_memory_falsify(args: argparse.Namespace) -> int:
    """Mark a memory_root falsified."""
    from arborist.memory import falsify

    conn = connect(args.db)
    try:
        with transaction(conn):
            event_hash = falsify(
                conn,
                args.root,
                reason=args.reason,
                triggering_branch_id=args.branch_id,
            )
    finally:
        conn.close()
    print(
        json.dumps(
            {
                "memory_root": args.root,
                "audit_event_hash": event_hash or None,
                "noop": event_hash == "",
            },
            indent=2,
            ensure_ascii=False,
        )
    )
    return 0


def _cmd_capital_summary(args: argparse.Namespace) -> int:
    """Aggregate capital_ledger totals; per-form sums + row count."""
    from arborist.capital import summary as capital_summary

    conn = connect(args.db)
    try:
        out = capital_summary(conn, op_type=args.op_type, since=args.since)
    finally:
        conn.close()
    print(json.dumps(out, indent=2, ensure_ascii=False))
    return 0


def _cmd_capital_op_cost(args: argparse.Namespace) -> int:
    """Per-form totals for one op_type."""
    from arborist.capital import op_cost

    conn = connect(args.db)
    try:
        out = op_cost(conn, args.op_type)
    finally:
        conn.close()
    print(json.dumps(out, indent=2, ensure_ascii=False))
    return 0


def _cmd_capital_top(args: argparse.Namespace) -> int:
    """Top-N op_types by total contribution to a single capital form."""
    from arborist.capital import top_by_form

    conn = connect(args.db)
    try:
        out = top_by_form(conn, args.form, limit=args.limit)
    finally:
        conn.close()
    print(json.dumps(out, indent=2, ensure_ascii=False))
    return 0


def _cmd_selfmodel_snapshot(args: argparse.Namespace) -> int:
    """Build a SelfModel from current store state and persist it.

    Reads the latest providence_cache row + audit_events to derive the
    canonical fields (model_profile_hash, governance_policy_hash,
    verifier_method_root, etc.) and writes one row to selfmodel_records
    + emits a selfmodel_snapshot_landed audit event.
    """
    from arborist.selfmodel import snapshot, store_snapshot

    conn = connect(args.db)
    try:
        with transaction(conn):
            sm = snapshot(conn)
            root = store_snapshot(conn, sm)
    finally:
        conn.close()
    print(json.dumps({"selfmodel_root": root}, indent=2, ensure_ascii=False))
    return 0


def _cmd_selfmodel_show(args: argparse.Namespace) -> int:
    """Print a SelfModel record by root, or the latest live one."""
    from arborist.selfmodel import latest, load
    from arborist.selfmodel.store import claims_for

    conn = connect(args.db)
    try:
        if args.root:
            row = load(conn, args.root)
        else:
            row = latest(conn)
        if row is None:
            print(json.dumps({"error": "no SelfModel found"}, indent=2))
            return 1
        body = row["body_blob"]
        if isinstance(body, (bytes, bytearray)):
            body = body.decode("utf-8", errors="replace")
        out = {
            "selfmodel_root": row["selfmodel_root"],
            "state": row["state"],
            "schema_version": row["schema_version"],
            "parent_selfmodel_root": row["parent_selfmodel_root"],
            "model_profile_hash": row["model_profile_hash"],
            "verifier_method_root": row["verifier_method_root"],
            "governance_policy_hash": row["governance_policy_hash"],
            "canonicalization_version": row["canonicalization_version"],
            "chunking_version": row["chunking_version"],
            "memory_root": row["memory_root"],
            "audit_event_hash": row["audit_event_hash"],
            "created_at": row["created_at"],
            "falsified_at": row["falsified_at"],
            "falsified_reason": row["falsified_reason"],
            "claims": claims_for(conn, row["selfmodel_root"]),
        }
    finally:
        conn.close()
    print(json.dumps(out, indent=2, ensure_ascii=False, default=str))
    return 0


def _cmd_selfmodel_falsify(args: argparse.Namespace) -> int:
    """Mark a SelfModel falsified, citing a reason."""
    from arborist.selfmodel import falsify

    conn = connect(args.db)
    try:
        with transaction(conn):
            event_hash = falsify(
                conn,
                args.root,
                reason=args.reason,
                triggering_claim_hash=args.claim_hash,
            )
    finally:
        conn.close()
    print(
        json.dumps(
            {
                "selfmodel_root": args.root,
                "audit_event_hash": event_hash or None,
                "noop": event_hash == "",
            },
            indent=2,
            ensure_ascii=False,
        )
    )
    return 0


def _cmd_selfmodel_list(args: argparse.Namespace) -> int:
    """List recent SelfModel rows, newest first."""
    conn = connect(args.db)
    try:
        rows = conn.execute(
            "SELECT selfmodel_root, state, model_profile_hash,"
            "       governance_policy_hash, created_at,"
            "       falsified_at, falsified_reason"
            " FROM selfmodel_records "
            " ORDER BY created_at DESC LIMIT ?",
            (args.limit,),
        ).fetchall()
    finally:
        conn.close()
    print(
        json.dumps(
            [dict(r) for r in rows], indent=2, ensure_ascii=False, default=str
        )
    )
    return 0


def _render_warrant_chain_tail(result: dict) -> str:
    """Compute a render-layer tail showing how many cited sources
    have a `warrant-resolver-v1` derivations chain back to a primary-
    source surface (ticket #000031 Phase 3).

    Returns ``""`` when no shards_dir is on the result dict, no
    chains exist, or the lookup fails — keeps the render path silent
    in absence of data.
    Returns ``" · warrant: 1 proven"`` (or higher count) when cited
    sources have Merkle-bound bindings to textbook surface chunks.

    The actual ladder upgrade — POINTER-LINKED → ANCHOR-WARRANTED →
    EVIDENCE-WARRANTED based on warrant chains — is intentionally
    NOT done here. This is a positive-signal render addition; the
    underlying audit_mode + violations stay as the verifier
    produced them.

    Reads ``result["_shards_dir"]`` (stashed by ``_cmd_query`` before
    render time). The render function takes only ``(result, question)``
    so the shards_dir threads through the result dict.
    """
    shards_dir = result.get("_shards_dir")
    if not shards_dir:
        return ""
    sources = result.get("sources") or []
    if not sources:
        return ""
    try:
        from arborist.qa.warrant_resolver import warrant_chains_for_sources

        chains = warrant_chains_for_sources(sources, shards_dir)
    except Exception:
        return ""
    if not chains:
        return ""
    n = len(chains)
    label = "warrant" if n == 1 else "warrants"
    return f" · {label}: {n} proven"


def _cmd_warrant_status(args: argparse.Namespace) -> int:
    """Read-only — show what surface chunks the citation resolver
    finds for each claim-pack record. JSON output. No DB writes.
    See ``arborist.qa.warrant_resolver`` for ticket #000031 Phase 2.
    """
    from arborist.qa.warrant_resolver import warrant_status

    shards_dir = args.global_shards_dir or args.shards_dir
    if not shards_dir:
        print("--shards-dir is required for warrant-status", file=sys.stderr)
        return 2
    results = warrant_status(shards_dir, limit=args.limit)
    out = []
    for r in results:
        out.append(
            {
                "record_root": r.record_root,
                "record_title": r.record_title,
                "source_reference": r.source_reference,
                "citations": [
                    {
                        "title": c.title,
                        "authors": list(c.authors),
                        "year": c.year,
                        "section": c.section,
                    }
                    for c in r.citations
                ],
                "matches": [
                    {
                        "shard": m.shard_path,
                        "document_root": m.document_root,
                        "document_title": m.document_title,
                        "chunk_id": m.chunk_id,
                        "score": m.score,
                        "snippet": m.snippet,
                    }
                    for m in r.matches
                ],
                "has_derivation": r.has_derivation,
            }
        )
    print(json.dumps(out, indent=2, ensure_ascii=False))
    return 0


def _cmd_warrant_resolve(args: argparse.Namespace) -> int:
    """Run the warrant resolver across all claim-pack records.
    Default dry-run (just summary counts); ``--write`` flag computes
    Merkle inclusion proofs for top matches and writes
    ``derivations`` rows. Idempotent at the database layer.
    """
    from arborist.qa.warrant_resolver import warrant_resolve

    shards_dir = args.global_shards_dir or args.shards_dir
    if not shards_dir:
        print("--shards-dir is required for warrant-resolve", file=sys.stderr)
        return 2
    summary = warrant_resolve(
        shards_dir,
        write=args.write,
        limit=args.limit,
        use_aliases=getattr(args, "use_aliases", False),
    )
    print(json.dumps(summary, indent=2))
    return 0


def _aliases_db_path(args: argparse.Namespace) -> str | None:
    """Resolve the aliases DB path. Default: shard 000 of the shards
    cluster (alongside claim-pack records). Operators can override
    via --aliases-db."""
    explicit = getattr(args, "aliases_db", None)
    if explicit:
        return str(explicit)
    shards_dir = args.global_shards_dir or getattr(args, "shards_dir", None)
    if not shards_dir:
        return None
    return str(Path(shards_dir) / "000.db")


def _cmd_alias_citation_add(args: argparse.Namespace) -> int:
    """Add a citation alias. Refuses without --by; audit-discipline
    fail-closed at the API surface."""
    from arborist.qa.aliases import add_citation_alias

    db = _aliases_db_path(args)
    if not db:
        print("--shards-dir or --aliases-db is required", file=sys.stderr)
        return 2
    try:
        inserted = add_citation_alias(
            db,
            original_ref=args.original,
            substitute_ref=args.substitute,
            substitute_authors=args.author or [],
            substitute_title=args.title or "",
            decision_by=args.by,
            decision_rationale=args.rationale or "",
        )
    except ValueError as exc:
        print(f"alias add failed: {exc}", file=sys.stderr)
        return 2
    print(json.dumps({"inserted": inserted, "db": db}, indent=2))
    return 0


def _cmd_alias_citation_list(args: argparse.Namespace) -> int:
    from arborist.qa.aliases import list_citation_aliases

    db = _aliases_db_path(args)
    if not db:
        print("--shards-dir or --aliases-db is required", file=sys.stderr)
        return 2
    aliases = list_citation_aliases(db, original_filter=args.filter)
    print(
        json.dumps(
            [
                {
                    "original_ref": a.original_ref,
                    "substitute_ref": a.substitute_ref,
                    "substitute_authors": list(a.substitute_authors),
                    "substitute_title": a.substitute_title,
                    "decision_at": a.decision_at,
                    "decision_by": a.decision_by,
                    "decision_rationale": a.decision_rationale,
                }
                for a in aliases
            ],
            indent=2,
            ensure_ascii=False,
        )
    )
    return 0


def _cmd_alias_citation_remove(args: argparse.Namespace) -> int:
    from arborist.qa.aliases import remove_citation_alias

    db = _aliases_db_path(args)
    if not db:
        print("--shards-dir or --aliases-db is required", file=sys.stderr)
        return 2
    removed = remove_citation_alias(db, args.original, args.substitute)
    print(json.dumps({"removed": removed, "db": db}, indent=2))
    return 0


def _cmd_alias_term_add(args: argparse.Namespace) -> int:
    """Add a term alias. Refuses without --by; audit-discipline
    fail-closed at the API surface."""
    from arborist.qa.aliases import add_term_alias

    db = _aliases_db_path(args)
    if not db:
        print("--shards-dir or --aliases-db is required", file=sys.stderr)
        return 2
    try:
        inserted = add_term_alias(
            db,
            term=args.term,
            alternate_term=args.alternate,
            domain=args.domain,
            decision_by=args.by,
            decision_rationale=args.rationale or "",
        )
    except ValueError as exc:
        print(f"alias add failed: {exc}", file=sys.stderr)
        return 2
    print(json.dumps({"inserted": inserted, "db": db}, indent=2))
    return 0


def _cmd_alias_term_list(args: argparse.Namespace) -> int:
    from arborist.qa.aliases import list_term_aliases

    db = _aliases_db_path(args)
    if not db:
        print("--shards-dir or --aliases-db is required", file=sys.stderr)
        return 2
    aliases = list_term_aliases(db, domain=args.domain, term_filter=args.filter)
    print(
        json.dumps(
            [
                {
                    "term": a.term,
                    "alternate_term": a.alternate_term,
                    "domain": a.domain,
                    "decision_at": a.decision_at,
                    "decision_by": a.decision_by,
                    "decision_rationale": a.decision_rationale,
                }
                for a in aliases
            ],
            indent=2,
            ensure_ascii=False,
        )
    )
    return 0


def _cmd_alias_term_remove(args: argparse.Namespace) -> int:
    from arborist.qa.aliases import remove_term_alias

    db = _aliases_db_path(args)
    if not db:
        print("--shards-dir or --aliases-db is required", file=sys.stderr)
        return 2
    removed = remove_term_alias(db, args.term, args.alternate, args.domain)
    print(json.dumps({"removed": removed, "db": db}, indent=2))
    return 0


def _cmd_sweep(args: argparse.Namespace) -> int:
    """Unconscious sweep — drain the meta-cognition backlog.

    Implements the warrant-resolver-only fragment of #000037 §3.1
    Target B (documents that bypassed meta-cognition at ingest
    time). The full bicameral sweep (canonical-projection probe
    / freshness probe / document-content witness fan-out) is
    gated on the §12 phase trigger and a small schema migration
    (`documents.last_swept_at`); `--target warrants` here is
    the no-schema-change increment available today.

    Idempotent: re-running on the same shards is a no-op at the
    database layer (PK collision on
    `(core_root, src_root, process_id)` per the existing
    `derivations` table). Operators can run on a cron / systemd
    timer without worrying about row proliferation.
    """
    from arborist.qa.warrant_resolver import warrant_resolve

    shards_dir = args.global_shards_dir or args.shards_dir
    if not shards_dir:
        print("--shards-dir is required for sweep", file=sys.stderr)
        return 2

    if args.target == "warrants":
        summary = warrant_resolve(
            shards_dir,
            write=args.write,
            limit=args.limit,
            use_aliases=getattr(args, "use_aliases", False),
        )
        summary["target"] = "warrants"
        summary["mode"] = "write" if args.write else "dry-run"
        print(json.dumps(summary, indent=2))
        return 0

    if args.target == "all":
        # Full bicameral sweep awaits #000037 §12 phase trigger
        # (canonical-projection probe + freshness probe + document-
        # content witness fan-out + documents.last_swept_at schema
        # migration). Today this is a no-op with an honest message.
        print(
            json.dumps(
                {
                    "target": "all",
                    "status": "deferred",
                    "reason": (
                        "Full bicameral sweep requires #000037 §12 "
                        "phase trigger + documents.last_swept_at "
                        "migration. Use --target warrants for the "
                        "available increment today."
                    ),
                },
                indent=2,
            )
        )
        return 0

    print(f"unknown sweep target: {args.target}", file=sys.stderr)
    return 2


def _cmd_mesh_status(args: argparse.Namespace) -> int:
    """Show mesh state: enabled flag, identity, current epoch, roster."""
    from arborist.mesh import current_epoch, is_enabled, load_identity
    from arborist.mesh.state import roster_at

    conn = connect(args.db)
    try:
        ident = load_identity(conn)
        epoch = current_epoch(conn)
        roster = roster_at(conn, epoch) if epoch is not None else []
        out = {
            "enabled": is_enabled(conn),
            "identity": (
                {
                    "member_id": ident.member_id,
                    "group_name": ident.group_name,
                    "sign_pub_hex": ident.sign_pub.hex(),
                    "dh_pub_hex": ident.dh_pub.hex(),
                    "created_at": ident.created_at,
                }
                if ident
                else None
            ),
            "current_epoch": epoch,
            "roster": [
                {
                    "member_id": m.member_id,
                    "role": m.role,
                    "sign_pub_hex": m.sign_pub.hex(),
                    "dh_pub_hex": m.dh_pub.hex(),
                }
                for m in roster
            ],
        }
    finally:
        conn.close()
    print(json.dumps(out, indent=2, ensure_ascii=False))
    return 0


def _cmd_mesh_init(args: argparse.Namespace) -> int:
    from arborist.mesh import init_identity

    conn = connect(args.db)
    try:
        ident = init_identity(conn, group_name=args.group, member_id=args.member_id)
    except RuntimeError as e:
        print(f"error: {e}", file=sys.stderr)
        conn.close()
        return 2
    finally:
        conn.close()
    print(
        json.dumps(
            {
                "member_id": ident.member_id,
                "group_name": ident.group_name,
                "sign_pub_hex": ident.sign_pub.hex(),
                "dh_pub_hex": ident.dh_pub.hex(),
                "note": "share sign_pub_hex + dh_pub_hex with the founder of any group "
                "you want to join. Run 'mesh enable' to flip the gating flag on.",
            },
            indent=2, ensure_ascii=False
        )
    )
    return 0


def _cmd_mesh_enable(args: argparse.Namespace) -> int:
    from arborist.mesh import set_enabled

    conn = connect(args.db)
    try:
        set_enabled(conn, True)
    finally:
        conn.close()
    print(json.dumps({"enabled": True}, indent=2, ensure_ascii=False))
    return 0


def _cmd_mesh_disable(args: argparse.Namespace) -> int:
    from arborist.mesh import set_enabled

    conn = connect(args.db)
    try:
        set_enabled(conn, False)
    finally:
        conn.close()
    print(json.dumps({"enabled": False}, indent=2, ensure_ascii=False))
    return 0


def _cmd_mesh_members(args: argparse.Namespace) -> int:
    from arborist.mesh import current_epoch
    from arborist.mesh.state import roster_at

    conn = connect(args.db)
    try:
        epoch = current_epoch(conn)
        if epoch is None:
            print(json.dumps({"error": "mesh not initialized"}, indent=2, ensure_ascii=False))
            return 2
        roster = roster_at(conn, epoch)
    finally:
        conn.close()
    print(
        json.dumps(
            {
                "epoch": epoch,
                "members": [
                    {
                        "member_id": m.member_id,
                        "role": m.role,
                        "sign_pub_hex": m.sign_pub.hex(),
                        "dh_pub_hex": m.dh_pub.hex(),
                    }
                    for m in roster
                ],
            },
            indent=2, ensure_ascii=False
        )
    )
    return 0


def _cmd_mesh_add(args: argparse.Namespace) -> int:
    from arborist.mesh.members import add_member

    try:
        sign_pub = bytes.fromhex(args.sign_pub)
        dh_pub = bytes.fromhex(args.dh_pub)
    except ValueError:
        print("error: --sign-pub and --dh-pub must be hex-encoded 32-byte keys", file=sys.stderr)
        return 2
    if len(sign_pub) != 32 or len(dh_pub) != 32:
        print("error: keys must decode to exactly 32 bytes", file=sys.stderr)
        return 2

    conn = connect(args.db)
    try:
        epoch = add_member(
            conn,
            member_id=args.member_id,
            sign_pub=sign_pub,
            dh_pub=dh_pub,
            role=args.role,
        )
    except (PermissionError, RuntimeError, ValueError) as e:
        print(f"error: {e}", file=sys.stderr)
        return 2
    finally:
        conn.close()
    print(json.dumps({"new_epoch": epoch, "added": args.member_id}, indent=2, ensure_ascii=False))
    return 0


def _cmd_mesh_kick(args: argparse.Namespace) -> int:
    from arborist.mesh.members import kick_member

    conn = connect(args.db)
    try:
        epoch = kick_member(conn, member_id=args.member_id, reason=args.reason)
    except (PermissionError, RuntimeError, ValueError) as e:
        print(f"error: {e}", file=sys.stderr)
        return 2
    finally:
        conn.close()
    print(
        json.dumps(
            {"new_epoch": epoch, "kicked": args.member_id, "reason": args.reason},
            indent=2, ensure_ascii=False
        )
    )
    return 0


def _cmd_mesh_rotate(args: argparse.Namespace) -> int:
    from arborist.mesh.members import scheduled_rotate

    conn = connect(args.db)
    try:
        epoch = scheduled_rotate(conn, reason=args.reason)
    except (PermissionError, RuntimeError) as e:
        print(f"error: {e}", file=sys.stderr)
        return 2
    finally:
        conn.close()
    print(json.dumps({"new_epoch": epoch, "reason": args.reason}, indent=2, ensure_ascii=False))
    return 0


def _cmd_mesh_serve(args: argparse.Namespace) -> int:
    """Run the HTTP gossip server until SIGINT."""
    from arborist.mesh import is_enabled, load_identity
    from arborist.mesh.wire import MeshWireServer

    conn = connect(args.db)
    try:
        if load_identity(conn) is None:
            print("error: mesh not initialized; run 'arborist mesh init' first", file=sys.stderr)
            return 2
        if not is_enabled(conn):
            print("error: mesh.enabled is off; run 'arborist mesh enable' first", file=sys.stderr)
            return 2
    finally:
        conn.close()

    srv = MeshWireServer(args.db, host=args.host, port=args.port)
    print(json.dumps({"status": "serving", "url": srv.url, "db": str(args.db)}, ensure_ascii=False))
    sys.stdout.flush()
    try:
        srv.serve()
    except KeyboardInterrupt:
        print(json.dumps({"status": "stopped", "reason": "SIGINT"}, ensure_ascii=False))
    finally:
        srv.stop()
    return 0


def _cmd_mesh_sync(args: argparse.Namespace) -> int:
    """Push local roots + falsifications to a peer.

    Two pushes happen by default (unless ``--no-roots`` /
    ``--no-falsifications`` opts one out):

    1. **ANNOUNCE_ROOT** for the most-recent ``--limit`` documents
       older than the kindergarten window. Receivers dedup by
       ``documents.document_root``.
    2. **ANNOUNCE_FALSIFICATION** for the most-recent ``--limit``
       falsifications older than the kindergarten window. Burns
       deliberately NOT propagated — local kindergarten cleanup.

    **Kindergarten window.** Records younger than
    ``--kindergarten-seconds`` (default 3600 = 1 hour) are NOT
    broadcast. Gives the operator time to inspect a fresh ingest or
    falsification & burn it before the network sees it. Override per
    invocation; ``--kindergarten-seconds 0`` broadcasts everything
    (cron-friendly opt-out for operators who prefer immediate
    propagation). The window is sender-side discipline; receivers
    don't enforce it because they have no view into when the sender
    created the record.

    Receivers verify the Ed25519 signature, run per-peer chain-of-
    claims fork detection, then write one ``mesh_received`` audit
    event per accepted envelope. Duplicate broadcasts produce
    duplicate audit-log entries on the receiver but no state
    corruption.
    """
    import time as _time

    from arborist.mesh import is_enabled, load_identity
    from arborist.mesh.wire import MeshWireClient

    now_ts = int(_time.time())
    cutoff_ts = now_ts - max(0, args.kindergarten_seconds)

    conn = connect(args.db)
    try:
        if load_identity(conn) is None:
            print("error: mesh not initialized", file=sys.stderr)
            return 2
        if not is_enabled(conn):
            print("error: mesh.enabled is off", file=sys.stderr)
            return 2
        # Total counts inform skipped-by-kindergarten reporting.
        total_roots = 0
        total_falsifications = 0
        root_rows: list = []
        falsification_rows: list = []
        if not args.no_roots:
            total_roots = conn.execute(
                "SELECT COUNT(*) FROM documents"
            ).fetchone()[0]
            root_rows = conn.execute(
                "SELECT document_root, document_uri, chunking_version, "
                " canonicalization_version, schema_version "
                "FROM documents WHERE ingest_ts <= ? "
                "ORDER BY rowid DESC LIMIT ?",
                (cutoff_ts, args.limit),
            ).fetchall()
        if not args.no_falsifications:
            total_falsifications = conn.execute(
                "SELECT COUNT(*) FROM falsifications"
            ).fetchone()[0]
            falsification_rows = conn.execute(
                "SELECT cache_key, reason FROM falsifications "
                "WHERE at <= ? "
                "ORDER BY at DESC LIMIT ?",
                (cutoff_ts, args.limit),
            ).fetchall()
    finally:
        conn.close()

    # Skipped-by-kindergarten = (rows younger than cutoff that would have
    # been in the most-recent --limit) — approximated by total minus what
    # we pulled, capped at limit.
    fresh_roots_held = max(
        0,
        min(total_roots, args.limit) - len(root_rows),
    ) if not args.no_roots else 0
    fresh_falsifications_held = max(
        0,
        min(total_falsifications, args.limit) - len(falsification_rows),
    ) if not args.no_falsifications else 0

    sent_roots: list[dict] = []
    sent_falsifications: list[dict] = []
    errors: list[dict] = []
    with MeshWireClient(args.db, args.peer) as client:
        try:
            peer_info = client.info()
        except Exception as e:
            print(json.dumps({"status": "peer_unreachable", "peer": args.peer, "error": str(e)}, indent=2, ensure_ascii=False))
            return 2
        for r in root_rows:
            try:
                resp = client.announce_root(
                    document_root=r["document_root"],
                    source_uri=r["document_uri"],
                    chunking_version=r["chunking_version"],
                    canonicalization_version=r["canonicalization_version"],
                    schema_version=r["schema_version"],
                )
                sent_roots.append({"document_root": r["document_root"], "ack": resp})
            except Exception as e:
                errors.append({"document_root": r["document_root"], "error": str(e)})
        for f in falsification_rows:
            try:
                resp = client.announce_falsification(
                    cache_key=f["cache_key"],
                    reason=f["reason"] or "",
                )
                sent_falsifications.append({"cache_key": f["cache_key"], "ack": resp})
            except Exception as e:
                errors.append({"cache_key": f["cache_key"], "error": str(e)})

    print(json.dumps({
        "status": "synced",
        "peer": args.peer,
        "peer_member_id": peer_info.get("member_id"),
        "peer_epoch": peer_info.get("current_epoch"),
        "kindergarten_seconds": args.kindergarten_seconds,
        "announced_roots": len(sent_roots),
        "announced_falsifications": len(sent_falsifications),
        "kindergarten_held_roots": fresh_roots_held,
        "kindergarten_held_falsifications": fresh_falsifications_held,
        "errors": len(errors),
        "sent_roots": sent_roots[: args.verbose],
        "sent_falsifications": sent_falsifications[: args.verbose],
        "error_samples": errors[:5],
    }, indent=2, ensure_ascii=False))
    return 0 if not errors else 1


def _cmd_mesh_pull(args: argparse.Namespace) -> int:
    """Pull a single document body from a peer by document_root.

    Closes the request half of the gossip loop. The wire client already
    verifies the peer's signature and re-derives the Merkle root from the
    delivered leaves before returning. This verb then re-ingests the
    delivered text through the standard ingest path so chunking_version /
    canonicalization_version stay consistent — and rejects with rc=2 if
    the local re-ingest produces a different document_root than requested.
    """
    from arborist.document import Document
    from arborist.ingest import ingest_source
    from arborist.mesh import is_enabled, load_identity
    from arborist.mesh.wire import MeshWireClient

    conn = connect(args.db)
    try:
        if load_identity(conn) is None:
            print("error: mesh not initialized", file=sys.stderr)
            return 2
        if not is_enabled(conn):
            print("error: mesh.enabled is off", file=sys.stderr)
            return 2
        already = conn.execute(
            "SELECT document_root, document_uri FROM documents WHERE document_root=?",
            (args.root,),
        ).fetchone()
    finally:
        conn.close()

    if already is not None:
        print(json.dumps({
            "status": "already_present",
            "document_root": already["document_root"],
            "document_uri": already["document_uri"],
            "shard": str(args.db),
        }, indent=2, ensure_ascii=False))
        return 0

    try:
        with MeshWireClient(args.db, args.peer) as client:
            body = client.request_body(root=args.root)
    except Exception as e:
        print(f"error: pull failed: {e}", file=sys.stderr)
        return 2

    delivered_uri = body.get("document_uri") or ""
    delivered_text = body.get("text") or ""

    class _PulledSource:
        source_type = "mesh_pull"

        def iter_documents(self):
            yield Document(
                uri=delivered_uri,
                content=delivered_text,
                source_type="mesh_pull",
                title=None,
            )

    conn = connect(args.db)
    try:
        ingest_source(conn, _PulledSource())
        row = conn.execute(
            "SELECT document_root FROM documents WHERE document_root=?",
            (args.root,),
        ).fetchone()
        if row is None:
            # Re-ingest produced a different root than the peer claimed.
            # The pulled text doesn't reproduce the requested root under
            # this peer's chunker/canonicalization. Fail closed.
            actual = conn.execute(
                "SELECT document_root FROM documents WHERE document_uri=? "
                "ORDER BY ingest_ts DESC LIMIT 1",
                (delivered_uri,),
            ).fetchone()
            actual_root = actual["document_root"] if actual else None
            print(
                "error: local re-ingest produced "
                f"{actual_root!r}, expected {args.root!r}",
                file=sys.stderr,
            )
            return 2
        with transaction(conn):
            event_hash = append_audit(
                conn,
                event_type="mesh_pulled",
                body={
                    "document_root": args.root,
                    "document_uri": delivered_uri,
                    "peer": args.peer,
                },
                subject_root=args.root,
            )
    finally:
        conn.close()

    print(json.dumps({
        "status": "pulled",
        "document_root": args.root,
        "document_uri": delivered_uri,
        "shard": str(args.db),
        "audit_event_hash": event_hash,
    }, indent=2, ensure_ascii=False))
    return 0


def _cmd_crawl(args: argparse.Namespace) -> int:
    """BFS-discover same-domain URLs from a seed and optionally ingest.

    Two modes:

    - default: print discovered URLs to stdout (one per line). Compose
      with `arborist ingest --source html` if you want to feed them
      through the standard ingest path manually.
    - ``--ingest``: run the discovery + ingest path in a single shot,
      capturing ETag + Last-Modified per page so a future
      ``crawler recrawl-check`` can do conditional HEADs.
    """
    try:
        from arborist.sources.crawler.bridge import crawl_seed, ingest_crawled
    except ImportError as e:
        print(f"error: {e}", file=sys.stderr)
        return 2

    from arborist.progress import Progress

    cap = "no cap" if args.max_pages == 0 else f"max {args.max_pages}"
    speed = "fast" if args.fast else "polite"
    print(
        f"  crawl: seed={args.seed_url} depth={args.depth} {cap} ({speed})",
        file=sys.stderr,
        flush=True,
    )
    crawl_progress = Progress(prefix="crawl ")
    urls = crawl_seed(
        args.seed_url,
        max_depth=args.depth,
        max_pages=args.max_pages,
        progress=crawl_progress,
        fast=args.fast,
    )
    print(
        f"  crawl: discovery done — {len(urls)} URLs",
        file=sys.stderr,
        flush=True,
    )

    if not args.ingest:
        for u in urls:
            print(u)
        return 0

    print(
        f"  ingest: starting on {len(urls)} URLs",
        file=sys.stderr,
        flush=True,
    )
    ingest_progress = Progress(prefix="ingest ", total_estimate=len(urls))
    conn = connect(args.db)
    try:
        result = ingest_crawled(
            conn,
            urls,
            progress=ingest_progress,
            default_author=getattr(args, "author", None),
        )
    finally:
        conn.close()
    print(json.dumps(
        {
            "status": "crawled_and_ingested",
            "seed": args.seed_url,
            "depth": args.depth,
            "max_pages": args.max_pages,
            "discovered": len(urls),
            **result,
        },
        indent=2, ensure_ascii=False
    ))
    return 0


def _cmd_crawler_recrawl_check(args: argparse.Namespace) -> int:
    """Send conditional HEAD requests for ingested documents.

    Reports each as fresh (304), stale (200, body changed), gone
    (404/410), or unreachable. Updates `document_http_meta.last_status`
    and `last_checked_at` so consecutive runs target the oldest checks
    first.
    """
    try:
        from arborist.sources.crawler.bridge import recrawl_check
    except ImportError as e:
        print(f"error: {e}", file=sys.stderr)
        return 2

    conn = connect(args.db)
    try:
        result = recrawl_check(
            conn,
            domain=args.domain,
            limit=args.limit,
        )
    finally:
        conn.close()
    print(json.dumps(result, indent=2, ensure_ascii=False))
    return 0


[docs] def build_parser() -> argparse.ArgumentParser: p = argparse.ArgumentParser( prog="arborist", description="An arborist for trees and forests of cross-linked information.", ) p.add_argument("--version", action="version", version=f"arborist {__version__}") p.add_argument( "--db", type=Path, default=DEFAULT_DB_PATH, help=f"path to arborist SQLite db (default: {DEFAULT_DB_PATH})", ) p.add_argument( "--shards-dir", dest="global_shards_dir", default=None, help=( "for read commands: attach all shards in this directory and " "expose them as UNION views over the standard tables" ), ) sub = p.add_subparsers(dest="cmd", required=True) # Optional sqlite-vec backend (#000039): only surface --embed / # `arborist embed` / `search --backend vec` when the [vec] extra # is installed. try: from arborist.search import VEC_AVAILABLE as _vec_ok except Exception: # pragma: no cover _vec_ok = False ingest = sub.add_parser("ingest", help="ingest documents from a source") ingest.add_argument( "--source", required=True, choices=[ "wikipedia_cur", "wikipedia_old", "wikipedia_xml", "wikipedia_xml_history", "wikipedia_abstract", "html", "grok_export", "grok_media", "git_repo", "hg_repo", "providence", "claim_pack", "textbook_tex", ], help="source type", ) ingest.add_argument( "--kindergarten-seconds", type=int, default=None, help=( "(providence source only) records younger than this many " "seconds stay opaque to ingestion — fresh thoughts cool " "before they become substrate. Default 3600s (1h)." ), ) ingest.add_argument( "--path", help=( "path to dump file (wikipedia) or to xAI export root / " "prod-grok-backend.json (grok_export, grok_media)" ), ) ingest.add_argument( "--url", action="append", help="URL to ingest (html source; repeatable)" ) ingest.add_argument( "--bundle", action="append", help=( "JSON bundle path (claim_pack source; repeatable). Provide axiom " "and theorem bundles together so cross-bundle pillar_reference " "edges resolve to specific record URIs." ), ) ingest.add_argument( "--urls-from", dest="urls_from", help="file with one URL per line (html source)", ) ingest.add_argument( "--no-robots", dest="no_robots", action="store_true", help="do not consult robots.txt (use only for explicitly authorized sites)", ) ingest.add_argument( "--author", default=None, help=( "default author surname for the html / textbook_tex sources " "(#000031 Phase 1 follow-up). Appended to document title as " "'<title>, by <author>' so the warrant resolver's " "_shard_matches_citation heuristic finds the surname in " "the title haystack. Wikisource and PG HTML pages rarely " "include author in <title>; the manifest carries it instead." ), ) ingest.add_argument( "--chunker", default=None, help="chunker name (default: tok-512-v1)" ) ingest.add_argument( "--limit", type=int, default=None, help="cap number of documents" ) ingest.add_argument( "--batch-size", dest="batch_size", type=int, default=200, help="documents per SQLite transaction (default 200)", ) ingest.add_argument( "--shard", default=None, help=( "rank/total — yield only every N-th doc for parallel ingest. " "spawn N processes, each with --shard 0/N, 1/N, ... they " "parallelize parser CPU and serialize writes via WAL" ), ) ingest.add_argument( "--shards-dir", dest="shards_dir", default=None, help=( "directory for attach-forever sharding. With --shard rank/total, " "writes to shards-dir/<rank>.db instead of --db, removing the " "WAL writer-lock contention entirely. Reads via arborist --shards-dir" ), ) ingest.add_argument( "--resume", action="store_true", help=( "rsync-style: read each source's last high-water mark from this " "DB's meta table and skip rows whose id is <= it. Safe to kill " "and restart at any time" ), ) ingest.add_argument( "--quiet", action="store_true", help="suppress periodic stderr progress output", ) ingest.add_argument( "--progress-interval", dest="progress_interval", type=float, default=2.0, help="seconds between stderr progress lines (default 2.0)", ) ingest.add_argument( "--total-estimate", dest="total_estimate", type=int, default=None, help=( "estimated total docs the source will yield. enables percent " "+ ETA in progress output" ), ) ingest.add_argument( "--no-loss-report", dest="no_loss_report", action="store_true", help=( "disable adapter LossReport sidecar (ticket #000022). " "Default: enabled. Sidecar — toggling does NOT invalidate " "QA cache_keys" ), ) ingest.add_argument( "--no-loss-excerpts", dest="no_loss_excerpts", action="store_true", help=( "drop sample_excerpt content from LossReport rows " "(PII-paranoid mode). sample_hash stays populated" ), ) ingest.add_argument( "--loss-excerpt-bytes", dest="loss_excerpt_bytes", type=int, default=200, help="max excerpt byte length (default 200)", ) if _vec_ok: ingest.add_argument( "--embed", action="store_true", help=( "after ingest, embed the chunks this run added (incremental " "chunk_vecs population — #000039; default ingest does NOT " "embed, the lazy `arborist embed` pass / cron / Prometheus-Σ " "sweep is the usual path)" ), ) ingest.set_defaults(func=_cmd_ingest) search = sub.add_parser("search", help="keyword/semantic search (UNGROUNDED audit mode)") search.add_argument("query", help="query string") search.add_argument("--limit", type=int, default=20) search.add_argument("--json", action="store_true", help="output JSON") _search_backends = ["fts5"] + (["vec"] if _vec_ok else []) search.add_argument( "--backend", choices=_search_backends, default="fts5", help=( "retrieval backend: 'fts5' (BM25 lexical, default) or 'vec' " "(semantic ANN over chunk_vecs — requires [vec] extra + a populated " "shard via `arborist embed`)" if "vec" in _search_backends else "retrieval backend: 'fts5' (BM25 lexical; 'vec' needs the [vec] extra)" ), ) search.set_defaults(func=_cmd_search) if "vec" in _search_backends: embed_cmd = sub.add_parser( "embed", help="populate chunk_vecs (semantic embeddings) for --db [vec extra]", ) embed_cmd.add_argument( "--limit", type=int, default=None, help="cap the number of chunks embedded (smoke-test on a real shard)", ) embed_cmd.add_argument("--batch-size", type=int, default=256) embed_cmd.add_argument( "--quant", choices=["float32", "int8"], default="float32", help=( "vector quantization: float32 (default — +25%% corpus tax, max " "fidelity) or int8 (~4x smaller, +6%% tax, ~1-3%% recall hit — " "the recommended production default; switching quant on an " "existing chunk_vecs requires --rebuild)" ), ) embed_cmd.add_argument( "--rebuild", action="store_true", help=( "DROP + recreate chunk_vecs (at --quant), then full re-embed — " "the clean version-bump path and the only way to switch quant " "(default is incremental: embed only chunks not already in " "chunk_vecs)" ), ) embed_cmd.set_defaults(func=_cmd_embed) verify = sub.add_parser( "verify", help="round-trip Merkle proofs for N random documents" ) verify.add_argument("-n", type=int, default=10) verify.set_defaults(func=_cmd_verify) distill = sub.add_parser( "distill", help="compress docs into Merkle-signed cores (surface->core or core->core)", ) distill.add_argument( "--process", default="first-sentence-v1", help="distiller name" ) distill.add_argument( "--kind", choices=["surface", "core"], default="surface", help="source kind to scan; 'core' runs recursive distillation", ) distill.add_argument( "--source-type", dest="source_type", default=None, help="restrict to one source_type", ) distill.add_argument( "--chunker", default=None, help="chunker for the core doc" ) distill.add_argument( "--limit", type=int, default=None, help="cap number of docs scanned" ) distill.add_argument( "--batch-size", dest="batch_size", type=int, default=200, help="cores written per SQLite transaction (default 200)", ) distill.set_defaults(func=_cmd_distill) ask_cmd = sub.add_parser( "ask", help="answer a question about a document (cache-first, STRICT)", ) ask_cmd.add_argument( "--document-root", dest="document_root", required=True, help="document_root to ask about", ) ask_cmd.add_argument( "--question", required=True, help="question text" ) ask_cmd.add_argument( "--model", default=None, help="model_id (default $ARBORIST_LLM_MODEL or hermes-3)", ) ask_cmd.add_argument( "--endpoint", default=None, help="OpenAI-compatible base URL (default $ARBORIST_LLM_ENDPOINT)", ) ask_cmd.add_argument( "--dry-run", dest="dry_run", action="store_true", help="use StubClient — no network call", ) ask_cmd.add_argument( "--answer-mode", dest="answer_mode", default=None, choices=["quote", "claim_lattice_pointer", "claim_lattice"], help=( "answer schema. See `query --answer-mode` for full semantics. " "Default 'quote'; 'claim_lattice_pointer' enables quote-by-pointer; " "'claim_lattice' is the JSON variant (vLLM guided_json + lenient " "pre-parser; pairs with grammar-constrained inference)." ), ) ask_cmd.set_defaults(func=_cmd_ask) query_cmd = sub.add_parser( "query", help="multi-source RAG: question -> top-K corpus docs -> Hermes -> cache", ) query_cmd.add_argument("question", help="the question to ask") query_cmd.add_argument( "--top-k", dest="top_k", type=int, default=8, help="max distinct source documents in context (default 8)", ) query_cmd.add_argument( "--over-fetch", dest="over_fetch", type=int, default=32, help="FTS5 hits to fetch per shard before dedup (default 32)", ) query_cmd.add_argument( "--max-context-chars", dest="max_context_chars", type=int, default=None, help=( "cap on assembled context bytes. When omitted, falls back to " "the per-mode default in DEFAULT_QUERY_POLICY['max_context_chars_by_mode'] " "(quote=24000, claim_lattice_pointer=24000, claim_lattice=48000). " "Sprint 1b 2026-05-02 — peaks measured per mode." ), ) query_cmd.add_argument( "--question-dedup", dest="question_dedup", default=None, choices=["strict", "equivalence_class"], help=( "write-time question canonicalization. 'equivalence_class' " "(default) collapses articles + trailing-punct + case so " "variants share cache_keys. 'strict' keeps every variant " "distinct (audit-grade)." ), ) query_cmd.add_argument( "--fidelity", dest="fidelity", default=None, choices=["strict", "equivalence_class"], help=( "lookup tolerance. 'equivalence_class' (default) tries the " "primary cache_key then falls back to the alternate dedup " "mode's cache_key. 'strict' refuses fallback." ), ) query_cmd.add_argument( "--qa-db", dest="qa_db", default=None, help=( "providence_cache target DB. default: <shards-dir>/qa.db, or " "~/.arborist/qa.db when no shards-dir" ), ) query_cmd.add_argument( "--model", default=None, help="model_id (default $ARBORIST_LLM_MODEL or hermes-3)", ) query_cmd.add_argument( "--endpoint", default=None, help="OpenAI-compatible base URL (default $ARBORIST_LLM_ENDPOINT)", ) query_cmd.add_argument( "--dry-run", dest="dry_run", action="store_true", help="use StubClient — assembles context but skips the LLM call", ) query_cmd.add_argument( "--json", action="store_true", help="emit the raw record as indented JSON (default: human render)", ) query_cmd.add_argument( "--burn", action="store_true", help=( "delete the matching live providence_cache row BEFORE lookup, " "forcing a fresh inference. Test-ergonomic — see new behavior " "without finding cache_keys by hand. Writes a providence_burn " "audit event." ), ) query_cmd.add_argument( "--repair", action="store_true", help=( "enable mechanical repair after first verify (off by " "default). When the verdict is HYBRID/UNGROUNDED, applies " "synthetic_elision split, trailing_artifact trim, and " "no_overlap drop deterministically; persists the repaired " "answer with a providence_repair audit event." ), ) query_cmd.add_argument( "--repair-reprompts", dest="repair_reprompts", type=int, default=0, help=( "max LLM re-prompt iterations after mechanical repair " "(default 0 = no re-prompt). Each iteration sends a feedback " "turn naming failed quotes; the model is asked to rewrite " "using only verbatim citations. Requires --repair." ), ) query_cmd.add_argument( "--answer-mode", dest="answer_mode", default=None, choices=["quote", "claim_lattice_pointer", "claim_lattice"], help=( "answer schema. 'quote' (default): model writes prose with " "verbatim quote spans inline. 'claim_lattice_pointer' (G0 " "/ CTI quote-by-pointer): runtime builds a labeled evidence " "map (E1, E2, …); model writes pointer-line prose ('Claim. " "[E12]'); renderer interpolates literal spans. Synthetic-" "elision-by-construction-impossible. No repair loop " "(one-shot discipline). 'claim_lattice' is the JSON variant " "(vLLM guided_json + lenient pre-parser; pairs with grammar-" "constrained inference like Qwen 3.6 reasoner / Claude / GPT-4)." ), ) query_cmd.add_argument( "--retrieval-keywords", dest="retrieval_keywords", default=None, help=( "operator-supplied keywords appended to the question for " "FTS5 retrieval ONLY — never sent to the LLM, never enters " "cache_key, never reaches the verifier. Use to narrow " "OR-mode retrieval on long discursive questions whose " "content tokens get diluted by template phrasing. Example: " "make query Q='what tech may enable one person to " "reconstruct another person's thoughts...' " "K='transcranial knowledge acquisition'. Pair with --burn " "to force fresh inference (keywords are session-only and " "cache-hits ignore them)." ), ) # Ticket #000008 Phase 4 — quantifier-guard CLI flags. Each # corresponds to a level of the §10.11.2 disable hierarchy. query_cmd.add_argument( "--no-quantifier-guard", dest="no_quantifier_guard", action="store_true", help=( "Disable the broad-quantifier preflight guard for this " "call. Overrides quantifier_guard_enabled in policy. " "Bench-side telemetry (quantifier_intensity, etc.) goes " "to None for the row. Use when the guard misclassifies." ), ) query_cmd.add_argument( "--allow-broad", dest="allow_broad", action="store_true", help=( "Emergent-search mode: keep the classifier on (telemetry " "stays useful) but don't apply caps. For broad questions " "where the operator wants exploratory enumeration, not " "grounded completeness." ), ) query_cmd.add_argument( "--reject-broad", dest="reject_broad", action="store_true", help=( "Strict mode: when intensity is ALL/COMPREHENSIVE/" "OPEN_REQUEST AND scope_bound_hint is unbounded, return " "UNGROUNDED before the LLM call with a " "BROAD_QUANTIFIER_REJECTED violation. Saves ~10-15s on " "rejected runs. Bounded universals (e.g. all members of " "the Beatles) are NOT rejected." ), ) query_cmd.add_argument( "--apply-quantifier-caps", dest="apply_quantifier_caps", action="store_true", help=( "Flip the dry-run gate per-call. By default Phase 2 " "lands with quantifier_guard_apply_caps=False so the " "cap is reported on the result but not applied to the " "verifier. This flag enables actual cap enforcement " "for one call. Use after dry-run bench review confirms " "the classifier output across the question set." ), ) # Ticket #000001 §7 Phase 0 — cross-language guard (default OFF; # dry-run rollout discipline). Enable per-call to experiment. query_cmd.add_argument( "--crosslang-guard", dest="crosslang_guard", action="store_true", help=( "Enable the deterministic cross-language guard for this " "call (ticket #000001 §7 Phase 0, default OFF). A query " "with a non-English signal (¿/¡/non-ASCII letter) has " "es-v1 function words stripped from the retrieval set so " "they can't drive an OR-mode full-corpus FTS5 scan; if no " "corpus-language content token survives, returns " "UNGROUNDED before retrieval/LLM with a " "CROSS_LANGUAGE_UNSUPPORTED violation. Retrieval-side " "only — never touches the verifier / cache_key / " "question_hash. Provably inert on English." ), ) query_cmd.add_argument( "--crosslang-translate", dest="crosslang_translate", action="store_true", help=( "Operation Sandwich (ticket #000056, default OFF; implies " "--crosslang-guard). When the non-English signal fires, " "translate the query es→en (local [mt] opus-mt; the " "English article ranks primary AND the LLM is prompted in " "English), ground the English answer with the UNTOUCHED " "verifier, then render the verified English answer back " "es as DISPLAY-ONLY (banner-labelled, zero grounding). " "Needs the [mt] extra; degrades to the Phase-0 guard if " "absent. Never touches the verifier / cache_key / " "question_hash / governance_policy_hash." ), ) # Ticket #000010 — meta-cognition CLI flags. query_cmd.add_argument( "--no-preflight", dest="no_preflight", action="store_true", help=( "Disable the meta-cognition preflight guard for this " "call. Skips temporal / contradiction / false-premise / " "out-of-corpus detectors. The QuestionState surfaces a " "stub with empty logical_statuses so bench rows stay " "column-aligned." ), ) query_cmd.add_argument( "--block-on-contradiction", dest="block_on_contradiction", action="store_true", help=( "Hard-block on lexical contradictions (default: label-" "only). Strict mode: questions like 'which unmarried " "spouse is X married to' return PREFLIGHT_BLOCKED." ), ) query_cmd.add_argument( "--soft-preflight", dest="soft_preflight", action="store_true", help=( "Ticket #000011 — opt-in to the model-assisted soft " "preflight sidecar. Adds one short LLM round-trip " "(~200ms median) before the main answer call; the model " "classifies the question shape and returns a SOFT_* " "advisory hint that surfaces as `· soft: <label>` on " "the audit-line tail. NEVER enters the verifier proof " "path; cannot create PREFLIGHT_OK or PREFLIGHT_BLOCKED." ), ) query_cmd.add_argument( "--no-canonical-preflight", dest="no_canonical_preflight", action="store_true", help=( "Disable the math/logic π* short-circuit. Pure-arithmetic " "(e.g. '0.1 + 0.2') and pure-propositional ('A IMPL B') " "questions normally bypass RAG and answer via " "arithmetic@v1 / logic-kernel@v1 directly. Pass this " "flag to force RAG against the LLM (useful when bench-" "comparing the canonical answer with the model's reply)." ), ) witness_group = query_cmd.add_mutually_exclusive_group() witness_group.add_argument( "--witness", dest="witness_override", action="store_const", const="on", help=( "Ticket #000028 — multi-modality witness. On canonical-shape " "questions (arithmetic@v1, logic-kernel@v1, future kernels), " "fan out kernel + cache + LLM in parallel and verify cross-" "modality agreement via kernel-canonicalization. Adds one LLM " "call (~2-5s) on top of the canonical fast path; default OFF " "to keep arithmetic queries at ~10ms." ), ) witness_group.add_argument( "--no-witness", dest="witness_override", action="store_const", const="off", help="Disable the witness path even when policy enables it.", ) progress_group = query_cmd.add_mutually_exclusive_group() progress_group.add_argument( "--progress", dest="progress_override", action="store_const", const="on", help=( "Force per-stage state-machine emission to stderr (per-shard " "× per-route search lines, llm.start/done, verify, persist, " "etc.). Default: on at TTY, off when stderr is piped. " "Format: [arborist NN.NNs] stage.name key=value …" ), ) progress_group.add_argument( "--no-progress", dest="progress_override", action="store_const", const="off", help=( "Suppress state-machine emission even when stderr is a TTY. " "Use when running interactively but piping output through " "tools that misbehave on stderr noise." ), ) query_cmd.set_defaults( func=_cmd_query, progress_override=None, witness_override=None, ) inspect_cmd = sub.add_parser( "inspect", help=( "sidecar diagnostic for a providence_cache record — pulls " "source chunks and classifies each unverified span " "(paraphrase / trailing_artifact / interior_elision / " "synthetic_elision_inside_quote / " "no_overlap). Read-only, " "no audit events, no v9.8 field changes." ), ) inspect_cmd.add_argument( "--cache-key", dest="cache_key", required=True, help="64-char hex cache_key of the providence record to inspect", ) inspect_cmd.add_argument( "--qa-db", dest="qa_db", default=None, help="path to qa.db (default: <shards>/qa.db or ~/.arborist/qa.db)", ) inspect_cmd.add_argument( "--json", action="store_true", help="emit raw diagnosis as JSON (default: human render)", ) inspect_cmd.set_defaults(func=_cmd_inspect) losses_cmd = sub.add_parser( "losses", help=( "list adapter LossReport sidecar rows (ticket #000022). " "Read-only; never enters proof path" ), ) losses_cmd.add_argument( "--document-root", dest="document_root", default=None, help="filter to one document_root (hex)", ) losses_cmd.add_argument( "--chunk-id", dest="chunk_id", type=int, default=None, help="filter to one chunk_id", ) losses_cmd.add_argument( "--kind", default=None, help="filter to one loss_kind (ref_tag, file_link, html_chrome, ...)", ) losses_cmd.add_argument( "--stage", default=None, help="filter to one stage (wikitext_base, html_normalize, ingest)", ) losses_cmd.add_argument( "--summary", action="store_true", help="aggregate by (stage, loss_kind, loss_mode)", ) losses_cmd.add_argument( "--limit", type=int, default=200, help="row cap for non-summary mode (default 200)", ) losses_cmd.add_argument( "--json", action="store_true", help="emit raw rows as JSON (default: human render)", ) losses_cmd.set_defaults(func=_cmd_losses) prov_cmd = sub.add_parser( "providence", help="list or falsify providence_cache records", ) prov_cmd.add_argument("--document-uri", dest="document_uri", default=None) prov_cmd.add_argument("--source-root", dest="source_root", default=None) prov_cmd.add_argument("--limit", type=int, default=20) prov_cmd.add_argument( "--falsify", default=None, metavar="CACHE_KEY", help=( "mark a providence_cache record as failed/stale/quarantined. " "Lookups will skip it. Audit chain records the act" ), ) prov_cmd.add_argument( "--state", default="failed", choices=["failed", "stale", "quarantined"], help="falsification state to set (default: failed)", ) prov_cmd.add_argument( "--reason", default=None, help="reason text stored in falsifications log", ) prov_cmd.add_argument( "--by-actor", dest="by_actor", default=None, help="who is falsifying (default: $USER)", ) prov_cmd.add_argument( "--show-preflight", dest="show_preflight", default=None, metavar="CACHE_KEY_PREFIX", help=( "Pull the preflight stage payload from a row's " "run_dag_blob. Match by 12-char prefix. Renders the " "preflight stage hash + run-DAG stage list. Operator " "tool for inspecting the policy state that governed a " "cached row (#000009 §7.2)." ), ) prov_cmd.set_defaults(func=_cmd_providence) ce_cmd = sub.add_parser( "controller-events", help=( "list controller_events advisory rows from #000037 Phase 2 " "(QA-runner → controller decision/difficulty/budget rows)" ), ) ce_cmd.add_argument("--limit", type=int, default=20) ce_cmd.add_argument( "--kind", choices=[ "controller_decision", "controller_difficulty", "controller_budget_allocation", "controller_falsification_proposal", ], default=None, help="filter to one event_kind", ) ce_cmd.add_argument( "--organism-prefix", dest="organism_prefix", default=None, help='match organism_root LIKE prefix (e.g. "qa:" for QA-runner advisories)', ) ce_cmd.add_argument( "--since-seconds", dest="since_seconds", type=int, default=None, help="only rows recorded within the last N seconds", ) ce_cmd.add_argument( "--body", action="store_true", help="include the JSON body_blob in JSON output (off by default — bodies are bulky)", ) ce_cmd.add_argument( "--json", action="store_true", help="emit JSON instead of a terminal table", ) ce_cmd.set_defaults(func=_cmd_controller_events) burn_cmd = sub.add_parser( "burn", help=( "delete a leaf with no children — providence_cache, document, " "or core (kindergarten use; falsify/evict are audit-preserving)" ), ) burn_cmd.add_argument( "--kind", choices=("providence", "document", "core"), default="providence", help="leaf kind to burn (default: providence — backwards-compatible)", ) burn_cmd.add_argument( "--cache-key", dest="cache_key", default=None, help="cache_key (hex) of the providence record to burn (kind=providence)", ) burn_cmd.add_argument( "--root", dest="root", default=None, help="document_root (hex) of the document/core to burn (kind=document|core)", ) burn_cmd.add_argument( "--reason", default=None, help="reason text recorded in the burn audit event", ) burn_cmd.add_argument( "--by-actor", dest="by_actor", default=None, help="who is burning (default: $USER)", ) burn_cmd.add_argument( "--force", action="store_true", help="burn even if children exist; not recommended", ) burn_cmd.set_defaults(func=_cmd_burn) burn_kg_cmd = sub.add_parser( "burn-kindergarten", help=( "burn every providence_cache record younger than the " "kindergarten window — test-ergonomic mass cleanup that " "matches the mesh-sync kindergarten window so only " "un-broadcast records get busted" ), ) burn_kg_cmd.add_argument( "--kindergarten-seconds", dest="kindergarten_seconds", type=int, default=3600, help=( "burn rows younger than this many seconds (default: 3600 = 1 " "hour, mirrors mesh sync default). 0 = burn everything live." ), ) burn_kg_cmd.add_argument( "--reason", default=None, help="reason text recorded in each providence_burn audit event", ) burn_kg_cmd.add_argument( "--by-actor", dest="by_actor", default=None, help="who is burning (default: $USER)", ) burn_kg_cmd.add_argument( "--force", action="store_true", help="burn even if rows have falsification children", ) burn_kg_cmd.add_argument( "--dry-run", dest="dry_run", action="store_true", help="report what would burn without writing", ) burn_kg_cmd.add_argument( "--verbose", type=int, default=10, help="include this many items in the result JSON (default: 10)", ) burn_kg_cmd.set_defaults(func=_cmd_burn_kindergarten) reclassify_cmd = sub.add_parser( "reclassify", help="re-run the verifier against existing live providence records " "(no LLM calls; relabels stale classifications)", ) reclassify_cmd.add_argument( "--qa-db", dest="qa_db", default=None, help="path to qa.db (default: <shards>/qa.db or ~/.arborist/qa.db)", ) reclassify_cmd.add_argument( "--limit", type=int, default=0, help="reclassify at most N records (0 = unlimited)", ) reclassify_cmd.add_argument( "--dry-run", dest="dry_run", action="store_true", help="report what would change without writing", ) reclassify_cmd.add_argument( "--entity-policy", dest="entity_policy", default=None, choices=["strict", "hybrid", "drop", "proximity"], help=( "how the entity path classifies: 'strict' (legacy, overclaims), " "'hybrid' (default — caps at HYBRID), 'drop' (skip entity path → " "UNGROUNDED), 'proximity' (STRICT only if N entities cluster within " "W chars in source)" ), ) reclassify_cmd.add_argument( "--compare", dest="compare", action="store_true", help="run all four entity policies side-by-side without writing", ) reclassify_cmd.set_defaults(func=_cmd_reclassify) emergent_cmd = sub.add_parser( "emergent", help="surface UNGROUNDED/HYBRID claims — corpus-growth signal", ) emergent_cmd.add_argument( "--aggregate", action="store_true", help="rank unverified quotes by frequency (vs per-record list)", ) emergent_cmd.add_argument("--limit", type=int, default=20) emergent_cmd.set_defaults(func=_cmd_emergent) evict_cmd = sub.add_parser( "evict", help="demote surface chunks hot→cold (NULL content, retain leaf_hash)", ) evict_cmd.add_argument( "--source-type", dest="source_type", default=None, help="restrict to one source_type", ) evict_cmd.add_argument( "--older-than-days", dest="older_than_days", type=int, default=None, help="only evict docs older than N days", ) evict_cmd.add_argument( "--document-root", action="append", default=None, help="explicit document_root(s) to evict; repeatable", ) evict_cmd.set_defaults(func=_cmd_evict) rehydrate_cmd = sub.add_parser( "rehydrate", help="refetch URI, verify leaves, restore cold content if root matches", ) rehydrate_cmd.add_argument( "--document-root", action="append", default=None, help="explicit document_root(s) to rehydrate; repeatable", ) rehydrate_cmd.add_argument( "--all-cold", dest="all_cold", action="store_true", help="rehydrate every document with cold chunks", ) rehydrate_cmd.set_defaults(func=_cmd_rehydrate) activity_cmd = sub.add_parser( "activity", help="recent Q&A + freshly cached docs (agent-readable timeline)", ) activity_cmd.add_argument( "--limit", type=int, default=10, help="max items per category (default 10)", ) activity_cmd.add_argument( "--since-seconds", dest="since_seconds", type=int, default=0, help="only events newer than this many seconds (0 = all time, default)", ) activity_cmd.add_argument( "--preview-chars", dest="preview_chars", type=int, default=240, help="answer preview length (default 240 chars)", ) activity_cmd.set_defaults(func=_cmd_activity) stats_cmd = sub.add_parser("stats", help="counts: docs, chunks, edges, audit") stats_cmd.set_defaults(func=_cmd_stats) canon_cmd = sub.add_parser( "canon", help=( "canonicalize input via a registered π* (direct projection, " "no RAG, no LLM)" ), ) canon_cmd.add_argument( "key", nargs="?", help="π* key, e.g. arithmetic@v1, logic-kernel@v1, code-py-ast@v1", ) canon_cmd.add_argument( "input", nargs="?", help="raw input string to canonicalize (UTF-8)", ) canon_cmd.add_argument( "--list", action="store_true", help="list registered π* keys + their domains and exit", ) canon_cmd.add_argument( "--json", action="store_true", help="emit {pi_star_ref, input, canonical, canonical_sha256}", ) canon_cmd.set_defaults(func=_cmd_canon) analyze_cmd = sub.add_parser( "analyze", help="compression spectrum, depth distribution, audit chain integrity", ) analyze_cmd.add_argument( "--gravity-top", dest="gravity_top", type=int, default=10, help="N top inbound-linked documents to report (default 10)", ) analyze_cmd.set_defaults(func=_cmd_analyze) # ----- snapshot subcommands ---------------------------------------------- snap_cmd = sub.add_parser( "snapshot", help="corpus-level Merkle snapshots: pin a forest state by single root", ) snap_sub = snap_cmd.add_subparsers(dest="snap_op", required=True) snap_create = snap_sub.add_parser( "create", help="compute snapshot_root from current corpus, persist + audit" ) snap_create.add_argument("--reason", default="manual") snap_create.add_argument( "--parent", default=None, help="explicit parent_snapshot hex (default: auto-link to latest prior snapshot)", ) snap_create.set_defaults(func=_cmd_snapshot_create) snap_list = snap_sub.add_parser("list", help="recent snapshots, newest first") snap_list.add_argument("--limit", type=int, default=20) snap_list.set_defaults(func=_cmd_snapshot_list) snap_verify = snap_sub.add_parser( "verify", help="recompute root from current corpus; matches=True iff nothing has changed", ) snap_verify.add_argument("snapshot_root", help="hex snapshot_root to verify") snap_verify.set_defaults(func=_cmd_snapshot_verify) snap_diff = snap_sub.add_parser( "diff", help="coarse drift signal between a snapshot and the current corpus", ) snap_diff.add_argument("snapshot_root", help="hex snapshot_root to diff against current") snap_diff.set_defaults(func=_cmd_snapshot_diff) # ----- substrate subcommands (ticket #000012 Phase 1a + future) ---------- # Was `arborist v8 score` until 2026-05-10; renamed for naming-consistency # with the arborist/substrate/ dir, which is itself the post-rename home # of what used to live under arborist/v8/. The ``v`` in ``v8`` referred # to the substrate-paper version, which collided with the v9.8 SQLite # schema version and confused readers. substrate_cmd = sub.add_parser( "substrate", help="Merkle-AGI substrate primitives (ForkScore + future paper specs)", ) substrate_sub = substrate_cmd.add_subparsers(dest="substrate_op", required=True) substrate_score = substrate_sub.add_parser( "score", help="ForkScore over (parent, child) bench-result JSON files (#000012)", ) substrate_score.add_argument( "--parent", required=True, help="path to parent bench-result JSON (from `bench.batteries.runner --all`)", ) substrate_score.add_argument( "--child", required=True, help="path to child bench-result JSON", ) substrate_score.add_argument( "--weights", default=None, help="optional path to a weights JSON file; falls through to DEFAULT_WEIGHTS", ) substrate_score.add_argument( "--capital-delta", dest="capital_delta", type=float, default=0.0, help="capital cost delta from #000020 ledger; positive = child costs more", ) substrate_score.add_argument( "--selfmodel-calibration-gain", dest="selfmodel_calibration_gain", type=float, default=0.0, help="SelfModel calibration improvement (parent→child); 0 if unmeasured", ) substrate_score.add_argument( "--audit-completeness", dest="audit_completeness", type=float, default=0.0, help="fraction of state-changes with audit-event in 0..1", ) substrate_score.add_argument( "--validator-diversity", dest="validator_diversity", type=float, default=0.0, help="multi-validator diversity score; 0 in single-validator mode", ) substrate_score.add_argument( "--security-risk", dest="security_risk", type=float, default=0.0, help="reserved; 0 in Phase 1a (no security-bench yet)", ) substrate_score.add_argument( "--complexity-delta", dest="complexity_delta", type=float, default=0.0, help="reserved; 0 in Phase 1a", ) substrate_score.add_argument( "--memory-invalidation-count", dest="memory_invalidation_count", type=float, default=0.0, help="count of memory_records the fork would falsify", ) substrate_score.add_argument( "--out", default=None, help=( "optional output file path; ScoredFork JSON is also written " "here in addition to stdout. Default: stdout only. The CI " "/ make bench-fork-score targets pin this to " "bench/results/fork_score_report.json so downstream graders " "/ ForkScore-aware mesh peers can ingest the artifact." ), ) # ----- Phase 1c (#000012 §7) — branch-set persistence --------------- # Default off. When --branch-set is present a row is written to # ``fork_score_branches``; absent ⇒ pure-function semantics # (Phase 1a behavior preserved). substrate_score.add_argument( "--branch-set", dest="branch_set", default=None, help=( "checkpoint identity (e.g. parent_root + ts); when present, " "writes one row to fork_score_branches sibling table" ), ) substrate_score.add_argument( "--branch-id", dest="branch_id", default=None, help="fork identifier; defaults to --child-root when omitted", ) substrate_score.add_argument( "--parent-root", dest="parent_root", default=None, help="shared parent root (required when --branch-set is given)", ) substrate_score.add_argument( "--child-root", dest="child_root", default=None, help="child root (nullable for in-flight branches)", ) substrate_score.add_argument( "--persist-shard", dest="persist_shard", default=None, help=( "SQLite path to write fork_score_branches row to; defaults " "to --db (the current arborist target shard)" ), ) substrate_score.add_argument( "--weights-id", dest="weights_id", default=None, help=( "opaque label for the WeightSet used; defaults to " "'default' or the basename of --weights" ), ) substrate_score.set_defaults(func=_cmd_substrate_score) # ----- memory subcommands (ticket #000017) -------------------------------- memory_cmd = sub.add_parser( "memory", help="lifelong-learning audit summary (ticket #000017)", ) memory_sub = memory_cmd.add_subparsers( dest="memory_op", required=True ) mem_snap = memory_sub.add_parser( "snapshot", help="build a memory snapshot from current store state", ) mem_snap.set_defaults(func=_cmd_memory_snapshot) mem_show = memory_sub.add_parser( "show", help="print a memory record by root, or the latest live one", ) mem_show.add_argument( "--root", default=None, help="hex memory_root (default: latest live)", ) mem_show.set_defaults(func=_cmd_memory_show) mem_branches = memory_sub.add_parser( "branches", help="list branch summaries attached to a memory_root", ) mem_branches.add_argument( "--root", default=None, help="hex memory_root (default: latest live)", ) mem_branches.set_defaults(func=_cmd_memory_branches) mem_fals = memory_sub.add_parser( "falsify", help="mark a memory_root falsified" ) mem_fals.add_argument("root", help="hex memory_root to falsify") mem_fals.add_argument( "--reason", required=True, help="why this memory is falsified" ) mem_fals.add_argument( "--branch-id", dest="branch_id", default=None, help="optional triggering branch_id", ) mem_fals.set_defaults(func=_cmd_memory_falsify) # ----- capital subcommands (ticket #000020) ------------------------------- capital_cmd = sub.add_parser( "capital", help="8-capital-form cost ledger (ticket #000020)", ) capital_sub = capital_cmd.add_subparsers( dest="capital_op", required=True ) cap_summary = capital_sub.add_parser( "summary", help="aggregate per-form sums across the ledger", ) cap_summary.add_argument( "--op-type", dest="op_type", default=None, help="filter to one op_type", ) cap_summary.add_argument( "--since", type=int, default=None, help="only rows with recorded_at >= this Unix timestamp", ) cap_summary.set_defaults(func=_cmd_capital_summary) cap_op = capital_sub.add_parser( "op-cost", help="per-form totals for one op_type" ) cap_op.add_argument("op_type", help="e.g. ingest|qa|distill") cap_op.set_defaults(func=_cmd_capital_op_cost) cap_top = capital_sub.add_parser( "top", help="top-N op_types by total in one capital form" ) cap_top.add_argument( "--form", required=True, choices=[ "living", "material", "financial", "intellectual", "experiential", "social", "cultural", "spiritual", ], ) cap_top.add_argument("--limit", type=int, default=10) cap_top.set_defaults(func=_cmd_capital_top) # ----- selfmodel subcommands (ticket #000014) ----------------------------- selfmodel_cmd = sub.add_parser( "selfmodel", help="agent identity record: capability claims, falsification (ticket #000014)", ) selfmodel_sub = selfmodel_cmd.add_subparsers( dest="selfmodel_op", required=True ) sm_snap = selfmodel_sub.add_parser( "snapshot", help="build a SelfModel from current store state and persist it", ) sm_snap.set_defaults(func=_cmd_selfmodel_snapshot) sm_show = selfmodel_sub.add_parser( "show", help="print a SelfModel by root, or the latest live one", ) sm_show.add_argument( "--root", default=None, help="hex selfmodel_root (default: latest live)", ) sm_show.set_defaults(func=_cmd_selfmodel_show) sm_fals = selfmodel_sub.add_parser( "falsify", help="mark a SelfModel falsified with a reason", ) sm_fals.add_argument("root", help="hex selfmodel_root to falsify") sm_fals.add_argument( "--reason", required=True, help="why this SelfModel is falsified" ) sm_fals.add_argument( "--claim-hash", dest="claim_hash", default=None, help="optional triggering capability-claim hash", ) sm_fals.set_defaults(func=_cmd_selfmodel_falsify) sm_list = selfmodel_sub.add_parser( "list", help="list recent SelfModel rows, newest first" ) sm_list.add_argument("--limit", type=int, default=20) sm_list.set_defaults(func=_cmd_selfmodel_list) # ----- warrant resolver (#000031 Phase 2) -------------------------------- warrant_status_cmd = sub.add_parser( "warrant-status", help="show citation-resolver matches per claim-pack record (read-only)", ) warrant_status_cmd.add_argument( "--shards-dir", dest="shards_dir", default=None, help="shards directory (overrides --shards-dir from global)", ) warrant_status_cmd.add_argument( "--limit", type=int, default=3, help="max candidate matches per record" ) warrant_status_cmd.set_defaults(func=_cmd_warrant_status) warrant_resolve_cmd = sub.add_parser( "warrant-resolve", help="run citation resolver; with --write, write derivations rows binding " "claim-pack records to surface chunks (Merkle proof)", ) warrant_resolve_cmd.add_argument( "--shards-dir", dest="shards_dir", default=None, help="shards directory (overrides --shards-dir from global)", ) warrant_resolve_cmd.add_argument( "--write", action="store_true", help="actually write derivations rows (default: dry-run summary)", ) warrant_resolve_cmd.add_argument( "--limit", type=int, default=1, help="how many top matches per record (default 1)" ) warrant_resolve_cmd.add_argument( "--use-aliases", dest="use_aliases", action="store_true", help=( "look up registered citation + term aliases at resolve " "time (#000041 + #000042); alias-resolved derivations " "carry process_id 'warrant-resolver-v1+alias'" ), ) warrant_resolve_cmd.set_defaults(func=_cmd_warrant_resolve) # ----- unconscious sweep (#000037 §3.1 partial) -------------------------- sweep_cmd = sub.add_parser( "sweep", help="unconscious sweep: re-run cross-checks on data that bypassed " "meta-cognition at ingest time (#000037 §3.1)", ) sweep_cmd.add_argument( "--shards-dir", dest="shards_dir", default=None, help="shards directory (overrides --shards-dir from global)", ) sweep_cmd.add_argument( "--target", choices=["warrants", "all"], default="warrants", help=( "sweep target: 'warrants' = re-run warrant resolver against every " "claim-pack record (#000031), idempotent at PK level. 'all' " "reserved for the full bicameral sweep landing later " "(canonical-projection probe / freshness probe / document-content " "witness — schema-bumped per #000037 §12 trigger)." ), ) sweep_cmd.add_argument( "--write", action="store_true", help="materialize derivations rows for newly-resolved records " "(default: dry-run; report what would land)", ) sweep_cmd.add_argument( "--limit", type=int, default=1, help="how many top matches per record to consider (default 1)", ) sweep_cmd.add_argument( "--use-aliases", dest="use_aliases", action="store_true", help="apply citation + term aliases (#000041 + #000042) at resolve time", ) sweep_cmd.set_defaults(func=_cmd_sweep) # ----- alias subcommands (#000041 + #000042) ----------------------------- alias_cmd = sub.add_parser( "alias", help="curated citation + term aliases (audit-disciplined; opt-in)", ) alias_sub = alias_cmd.add_subparsers(dest="alias_op", required=True) # alias citation {add,list,remove} alias_cit = alias_sub.add_parser( "citation", help="citation aliases (#000041): substitute textbook for proprietary cite", ) alias_cit_sub = alias_cit.add_subparsers(dest="alias_cit_op", required=True) cit_add = alias_cit_sub.add_parser( "add", help="add a citation alias (refuses without --by)" ) cit_add.add_argument("original", help="original source_reference string") cit_add.add_argument( "--substitute", required=True, help="replacement citation string" ) cit_add.add_argument( "--author", action="append", default=None, help="author of the substitute work (repeatable)", ) cit_add.add_argument( "--title", default="", help="title of the substitute work" ) cit_add.add_argument( "--by", required=True, help="who decided this alias (audit field; refuses if empty)", ) cit_add.add_argument( "--rationale", default="", help="why this alias is appropriate" ) cit_add.add_argument( "--aliases-db", default=None, help="aliases DB path (default: <shards-dir>/000.db)", ) cit_add.set_defaults(func=_cmd_alias_citation_add) cit_list = alias_cit_sub.add_parser( "list", help="list registered citation aliases" ) cit_list.add_argument( "--filter", default=None, help="substring filter on original_ref" ) cit_list.add_argument("--aliases-db", default=None) cit_list.set_defaults(func=_cmd_alias_citation_list) cit_rm = alias_cit_sub.add_parser( "remove", help="remove a citation alias by (original, substitute)" ) cit_rm.add_argument("original") cit_rm.add_argument("--substitute", required=True) cit_rm.add_argument("--aliases-db", default=None) cit_rm.set_defaults(func=_cmd_alias_citation_remove) # alias term {add,list,remove} alias_term = alias_sub.add_parser( "term", help="term aliases (#000042): vocabulary bridge for old vs modern words", ) alias_term_sub = alias_term.add_subparsers(dest="alias_term_op", required=True) term_add = alias_term_sub.add_parser( "add", help="add a term alias (refuses without --by)" ) term_add.add_argument("term", help="modern term used in claim-pack records") term_add.add_argument( "alternate", help="historical / foreign / alternate term used in textbook prose" ) term_add.add_argument( "--domain", required=True, help="domain string (e.g., 'geometry', 'logic')", ) term_add.add_argument( "--by", required=True, help="who decided this alias (audit field; refuses if empty)", ) term_add.add_argument( "--rationale", default="", help="why this alias is appropriate" ) term_add.add_argument("--aliases-db", default=None) term_add.set_defaults(func=_cmd_alias_term_add) term_list = alias_term_sub.add_parser( "list", help="list registered term aliases" ) term_list.add_argument("--domain", default=None) term_list.add_argument( "--filter", default=None, help="substring filter on term" ) term_list.add_argument("--aliases-db", default=None) term_list.set_defaults(func=_cmd_alias_term_list) term_rm = alias_term_sub.add_parser( "remove", help="remove a term alias by (term, alternate, domain)" ) term_rm.add_argument("term") term_rm.add_argument("alternate") term_rm.add_argument("--domain", required=True) term_rm.add_argument("--aliases-db", default=None) term_rm.set_defaults(func=_cmd_alias_term_remove) # ----- mesh subcommands (off by default) --------------------------------- mesh_cmd = sub.add_parser( "mesh", help="federation/gossip layer (off by default; opt-in via 'mesh enable')", ) mesh_sub = mesh_cmd.add_subparsers(dest="mesh_op", required=True) mesh_status = mesh_sub.add_parser("status", help="show enabled flag, identity, current epoch + roster") mesh_status.set_defaults(func=_cmd_mesh_status) mesh_init = mesh_sub.add_parser("init", help="generate this peer's keys; create epoch 0") mesh_init.add_argument("--group", required=True, help="group name") mesh_init.add_argument("--member-id", dest="member_id", default=None, help="optional fixed member id (default: random 8-hex)") mesh_init.set_defaults(func=_cmd_mesh_init) mesh_enable = mesh_sub.add_parser("enable", help="flip the mesh.enabled flag on") mesh_enable.set_defaults(func=_cmd_mesh_enable) mesh_disable = mesh_sub.add_parser("disable", help="flip the mesh.enabled flag off") mesh_disable.set_defaults(func=_cmd_mesh_disable) mesh_members = mesh_sub.add_parser("members", help="list current epoch's roster") mesh_members.set_defaults(func=_cmd_mesh_members) mesh_add = mesh_sub.add_parser("add", help="admin-only: add a peer to the roster (bumps epoch)") mesh_add.add_argument("--member-id", dest="member_id", required=True) mesh_add.add_argument("--sign-pub", dest="sign_pub", required=True, help="hex Ed25519 pubkey (32 bytes / 64 hex chars)") mesh_add.add_argument("--dh-pub", dest="dh_pub", required=True, help="hex X25519 pubkey") mesh_add.add_argument("--role", choices=["admin", "member"], default="member") mesh_add.set_defaults(func=_cmd_mesh_add) mesh_kick = mesh_sub.add_parser("kick", help="admin-only: evict a peer (bumps epoch; old signatures stay valid, new gossip is opaque to them)") mesh_kick.add_argument("--member-id", dest="member_id", required=True) mesh_kick.add_argument("--reason", required=True) mesh_kick.set_defaults(func=_cmd_mesh_kick) mesh_rotate = mesh_sub.add_parser("rotate", help="refresh epoch secret without changing roster") mesh_rotate.add_argument("--reason", default="scheduled") mesh_rotate.set_defaults(func=_cmd_mesh_rotate) mesh_serve = mesh_sub.add_parser( "serve", help="run the HTTP gossip server (blocks until SIGINT)", ) mesh_serve.add_argument("--host", default="127.0.0.1", help="bind host (default: 127.0.0.1)") mesh_serve.add_argument("--port", type=int, default=8400, help="bind port (default: 8400)") mesh_serve.set_defaults(func=_cmd_mesh_serve) mesh_sync = mesh_sub.add_parser( "sync", help="announce local document_roots to a peer's gossip server", ) mesh_sync.add_argument("--peer", required=True, help="peer URL, e.g. http://other.example.com:8400") mesh_sync.add_argument("--limit", type=int, default=100, help="announce at most N most-recent items per category (default: 100)") mesh_sync.add_argument("--verbose", type=int, default=10, help="include this many ack details in output (default: 10)") mesh_sync.add_argument( "--no-roots", dest="no_roots", action="store_true", help="skip ANNOUNCE_ROOT broadcast (only push falsifications)", ) mesh_sync.add_argument( "--no-falsifications", dest="no_falsifications", action="store_true", help="skip ANNOUNCE_FALSIFICATION broadcast (only push roots)", ) mesh_sync.add_argument( "--kindergarten-seconds", dest="kindergarten_seconds", type=int, default=3600, help=( "hold records younger than this many seconds back from the " "broadcast (default: 3600 = 1 hour). Gives operators time to " "burn or falsify before peers see it. 0 = broadcast everything." ), ) mesh_sync.set_defaults(func=_cmd_mesh_sync) mesh_pull = mesh_sub.add_parser( "pull", help="pull one document body from a peer by document_root", ) mesh_pull.add_argument("--root", required=True, help="64-char hex document_root to pull") mesh_pull.add_argument("--peer", required=True, help="peer URL, e.g. http://other.example.com:8400") mesh_pull.set_defaults(func=_cmd_mesh_pull) crawl_cmd = sub.add_parser( "crawl", help=( "BFS-discover same-domain URLs from a seed; optionally ingest " "and store ETag/Last-Modified for cheap recrawl-checks " "(requires arborist[crawler] extras)" ), ) crawl_cmd.add_argument("--seed-url", dest="seed_url", required=True) crawl_cmd.add_argument("--depth", type=int, default=2, help="max BFS depth (default: 2)") crawl_cmd.add_argument( "--max-pages", dest="max_pages", type=int, default=0, help="cap discovery at N URLs (0 = no cap, depth is the only bound; default: 0)", ) crawl_cmd.add_argument( "--ingest", action="store_true", help="ingest the discovered pages into --db (default: print URL list only)", ) crawl_cmd.add_argument( "--fast", action="store_true", help=( "fast_mode: 5s timeouts, CPU*3 parallel page workers, ignore " "robots.txt crawl-delay (Disallow is still honored). Use only " "against domains where aggressive fetching is acceptable." ), ) crawl_cmd.add_argument( "--author", default=None, help=( "default author surname (#000031 Phase 1 follow-up). " "Appended to ingested document titles when the HTML's " "<title> doesn't already carry the surname. Threads into " "_shard_matches_citation's title-haystack for warrant " "resolution. Only relevant with --ingest." ), ) crawl_cmd.set_defaults(func=_cmd_crawl) crawler_cmd = sub.add_parser( "crawler", help="crawler maintenance verbs (recrawl-check, ...)", ) crawler_sub = crawler_cmd.add_subparsers(dest="crawler_op", required=True) recrawl_check_cmd = crawler_sub.add_parser( "recrawl-check", help=( "send conditional HEAD requests for ingested documents and " "classify each as fresh/stale/gone/unreachable" ), ) recrawl_check_cmd.add_argument( "--domain", default=None, help="restrict to documents whose URI contains this domain", ) recrawl_check_cmd.add_argument( "--limit", type=int, default=100, help="check at most N documents (oldest checks first; default: 100)", ) recrawl_check_cmd.set_defaults(func=_cmd_crawler_recrawl_check) return p
[docs] def main(argv: list[str] | None = None) -> int: args = build_parser().parse_args(argv) return args.func(args)
if __name__ == "__main__": raise SystemExit(main())