Source code for arborist.qa.dag

"""Per-run Merkle-DAG provenance for providence records.

Each query/ask call passes through several stages:

    question → retrieval → context → prompt → answer → verify → final_label

Each stage emits a hash; the run's identity is the Merkle root over the
ordered sequence of stage hashes. Stored on the providence record as
``run_dag_root`` (alongside ``cache_key``). The DAG is verifiable: given
the persisted node list & the same Merkle conventions arborist uses
elsewhere (non-commutative HashCombine, prefix 0x03, leaf prefix 0x00,
self-duplicate odd rule), an auditor can recompute the root from the
nodes & confirm the run was constructed as recorded.

Distinct from the linear ``audit_events`` chain — that chain tracks
state-changing operations across the DB. This DAG tracks the
computation provenance of one specific answer. Both coexist; the
record's ``audit_event_hash`` links to the chain, ``run_dag_root`` &
``run_dag_blob`` carry the per-run computation graph.

Stages chosen to mirror the toy-Hermes design (fox 2026-04-30)::

    question      hash of question_hash (8-dim cache_key dim)
    retrieval     hash of sources summary (document_roots + roles +
                  scores) -- captures which docs ranked & how
    context       context_root (Merkle root over sorted source roots,
                  the "source" dim of the cache_key)
    prompt        conversation_hash (the assembled messages)
    answer        sha256(answer_text)
    verify        hash of verdict summary (audit_mode, verifier_method,
                  n_quotes, n_verified, claim_statuses)
    final_label   hash of (audit_mode, verifier_method, lookup_path)

The DAG is NOT part of cache_key. cache_key inputs (the 8 dims)
determine the answer; the answer determines the DAG. Folding the DAG
back into cache_key would create a circular dependency.
"""

from __future__ import annotations

import hashlib
import json

from arborist.merkle import MerkleTree


def _sha256_hex(s: str) -> str:
    # ``errors='surrogatepass'`` lets lone UTF-16 surrogates through as
    # their WTF-8 form. Hermes occasionally emits text with unpaired
    # surrogates inside multi-byte sequences; bare ``.encode('utf-8')``
    # raises UnicodeEncodeError on those, which previously aborted the
    # run with no Merkle root. The hash stays deterministic because the
    # WTF-8 byte sequence is reversible & unique per input.
    return hashlib.sha256(s.encode("utf-8", errors="surrogatepass")).hexdigest()


def _canonical_json(obj) -> str:
    return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False)


[docs] def localize_failure( *, audit_mode: str, n_sources: int, n_quotes: int, n_verified: int, ) -> str | None: """Map a non-STRICT verdict to the pipeline stage that introduced the failure. Returns ``None`` for STRICT outcomes. Stage labels (in pipeline order): - ``retrieval`` — no admitted sources. Title/body gates rejected everything, or the corpus genuinely lacks the topic. Repair path: ingest more sources or relax the breadth threshold. - ``context`` — sources admitted but no quotes extracted. Could be a context-truncation issue (per-source cap dropped the relevant paragraph) or a model that declined to cite anything. Repair path: raise per-source cap; tighten prompt. - ``answer`` — sources retrieved & quotes extracted but they don't verify. The model either fabricated content, paraphrased inside quotes, or appended citation tails. Repair path: the ``mechanical_repair`` pass + (when wired) the re-prompt feedback loop. The toy-Hermes design pass calls this "chain-segment failure localization" — debugging becomes typed instead of vague. An operator reading ``failure_stage='answer'`` knows retrieval & context were fine; the model is what to fix. ``failure_stage='retrieval'`` means stop tuning the verifier & go ingest a relevant source. """ if audit_mode == "STRICT": return None if n_sources == 0: return "retrieval" if n_quotes == 0: return "context" # Quotes were extracted but didn't all verify (or none did). return "answer"
PREFLIGHT_NODE_VERSION = "preflight-node-v1"
[docs] def build_preflight_node_payload( *, question_state: dict | None = None, quantifier: dict | None = None, answer_contract: dict | None = None, prompt_contract: dict | None = None, evidence_contract: dict | None = None, policy_refs: dict | None = None, ) -> dict: """Build the canonical nested-clause payload for the preflight DAG stage. Returns a JSON-ready dict; pair with :func:`preflight_node_hash` to get the SHA-256 hex. Five-clause structure per ticket #000009 §8.2 / feedback §3: - ``classifier`` — quantifier classifier output (#000008): intensity, matched_token, explicit_count, scope_bound_hint, is_broad, classifier_version, operational_shape. - ``answer_contract`` — guard / cap / reject decisions taken on this run. - ``prompt_contract`` — reminder enabled / injected / template_id (#000008 §10.5). - ``evidence_contract`` — exposure budget, one-claim-per-line discipline (#000010 §10.4). - ``policy_refs`` — governance_policy_hash + model_profile_hash + answer_mode. Reference-by-hash rather than raw policy bundles (feedback §4: avoid double-committing already-hashed state). Plus the metacog ``question_state`` from #000010 — that's its own clause for now (logical_statuses, false_premise_hints, contradiction_pairs). It's hashed separately by `metacognition.preflight_policy_hash` already. Any clause may be None / empty — the resulting payload is still stable. Includes ``node_version`` so legacy runs without the node can be unambiguously labeled `unavailable_legacy_run` by audit tools. """ return { "stage": "preflight", "node_version": PREFLIGHT_NODE_VERSION, "classifier": dict(quantifier) if quantifier else {}, "answer_contract": dict(answer_contract) if answer_contract else {}, "prompt_contract": dict(prompt_contract) if prompt_contract else {}, "evidence_contract": dict(evidence_contract) if evidence_contract else {}, "policy_refs": dict(policy_refs) if policy_refs else {}, # Metacognition QuestionState carries # ``preflight_policy_hash`` internally so flipping a metacog # detector invalidates this clause via that field. Stored # nested so audit-replay can read all metacog signal in one # place without descending into the quantifier classifier. "question_state": dict(question_state) if question_state else {}, }
[docs] def preflight_node_hash( *, question_state: dict | None = None, quantifier: dict | None = None, answer_contract: dict | None = None, prompt_contract: dict | None = None, evidence_contract: dict | None = None, policy_refs: dict | None = None, ) -> str: """Hash the preflight decision into a stable SHA-256 hex string. Returns the hash of the nested-clause payload built by :func:`build_preflight_node_payload`. See that function for the five-clause structure. Audit-replay payoff: two cache rows that share the same question + same model output + same verifier verdict but different preflight policy state produce different hashes here, which propagate to ``run_dag_root`` via :func:`build_run_dag`. Backward compatibility note: Pre-2026-05-04 (`c36e85c`) callers used a flat 3-key payload (`question_state` / `quantifier` / `policy_state`). Hashes computed with that callsite will NOT match this restructured callsite — `run_dag_root` values for rows written between `c36e85c` and the current commit are treated as a discrete generation; they're still verifiable by re-reading `run_dag_blob` (the persisted blob captures the payload that was actually hashed). """ payload = build_preflight_node_payload( question_state=question_state, quantifier=quantifier, answer_contract=answer_contract, prompt_contract=prompt_contract, evidence_contract=evidence_contract, policy_refs=policy_refs, ) return _sha256_hex(_canonical_json(payload))
[docs] def build_run_dag( *, question_hash: str, sources: list[dict], context_root: str, conversation_hash: str, answer_text: str, audit_mode: str, verifier_method: str, n_quotes: int, n_verified: int, claim_statuses: list[dict] | None = None, lookup_path: str | None = None, evidence_map_root: str | None = None, answer_mode: str | None = None, violations: list[dict] | None = None, raw_answer_text: str | None = None, parsed_lattice: list | None = None, rendered_text: str | None = None, retrieval_plan_hash: str | None = None, preflight_hash: str | None = None, preflight_payload: dict | None = None, ) -> dict: """Return ``{"root": <hex>, "nodes": [<stage>, <hash>], ...}``. All inputs are already-computed hashes or text; no I/O. Idempotent & deterministic — same inputs always produce the same root, byte-for- byte across machines (as long as the Merkle conventions stay pinned; they do, via ``arborist.merkle``). Two base DAG shapes; both gain an optional ``preflight`` stage when ``preflight_hash`` is supplied (Ticket #000009): - **Quote mode (default).** 7 stages — ``question / retrieval / context / prompt / answer / verify / final_label``. Triggered when ``evidence_map_root`` is None. Backward-compatible with all run_dag_root values written by code that pre-dates G0. With ``preflight_hash``, becomes 8 stages — ``question / preflight / retrieval / ...``. - **Claim-lattice-pointer mode (G0 / CTI).** 9 stages — ``question / retrieval / evidence_map / prompt / raw_answer / parsed_claim_lattice / verify / render / final_label``. Triggered when ``evidence_map_root`` is non-None. Splits the single ``answer`` node into three: the model's raw output, the parsed claim-lattice, and the rendered prose with literal spans interpolated. ``context`` drops out (the context IS the evidence map). All three of ``raw_answer_text`` / ``parsed_lattice`` / ``rendered_text`` should be supplied; missing args fall back to ``answer_text`` for the raw_answer & render hashes and ``[]`` for the parsed_lattice hash. With ``preflight_hash``, becomes 10 stages. ``answer_mode`` & ``violations`` fold into the verify & final_label payloads when provided. ``preflight_hash`` (Ticket #000009) is optional; when None, the DAG shape remains 7/9 stages exactly so pre-#000009 records can be re-validated. When supplied, the preflight stage inserts at position 1 (between ``question`` and ``retrieval``) per ticket #000009 §3.1. """ sources_summary = [ { "document_root": s.get("document_root"), "source_role": s.get("source_role"), "score": s.get("score"), "chunk_idx": s.get("chunk_idx"), } for s in sources ] sources_summary_hash = _sha256_hex(_canonical_json(sources_summary)) # Retrieval stage hash: when a retrieval_plan_hash is supplied # (per ticket #000001 — provenance binding for operator-influenced # retrieval inputs like keywords / top_k / over_fetch), the stage # hash binds BOTH the plan (input) and the sources_summary # (output). Without a plan supplied, fall back to the historical # sources-summary-only hash so pre-#000001 records keep their # run_dag_root values stable. Greenfield records that omit the # plan stay readable by the run-DAG validator. if retrieval_plan_hash is not None: retrieval_hash = _sha256_hex(_canonical_json({ "retrieval_plan_hash": retrieval_plan_hash, "sources_summary_hash": sources_summary_hash, })) else: retrieval_hash = sources_summary_hash answer_hash = _sha256_hex(answer_text) failure_stage = localize_failure( audit_mode=audit_mode, n_sources=len(sources), n_quotes=n_quotes, n_verified=n_verified, ) verify_payload = { "audit_mode": audit_mode, "verifier_method": verifier_method, "n_quotes": n_quotes, "n_verified": n_verified, "claim_statuses": claim_statuses or [], "failure_stage": failure_stage, } if violations is not None: verify_payload["violations"] = violations verify_hash = _sha256_hex(_canonical_json(verify_payload)) final_label_payload = { "audit_mode": audit_mode, "verifier_method": verifier_method, "lookup_path": lookup_path, } if answer_mode is not None: final_label_payload["answer_mode"] = answer_mode final_label_hash = _sha256_hex(_canonical_json(final_label_payload)) if evidence_map_root is None: # Quote-mode 7-stage shape — backward-compatible. nodes = [ {"stage": "question", "hash": question_hash}, {"stage": "retrieval", "hash": retrieval_hash}, {"stage": "context", "hash": context_root}, {"stage": "prompt", "hash": conversation_hash}, {"stage": "answer", "hash": answer_hash}, {"stage": "verify", "hash": verify_hash}, {"stage": "final_label", "hash": final_label_hash}, ] else: # Pointer-mode 9-stage shape (CTI). ``context`` drops out; # ``answer`` splits into raw_answer / parsed_claim_lattice / # render so each provenance step gets its own commitment. raw_text = raw_answer_text if raw_answer_text is not None else answer_text rendered = rendered_text if rendered_text is not None else answer_text raw_answer_hash = _sha256_hex(raw_text) # Parsed lattice = list of {claim_text, evidence_ids[]} dicts in # input order; canonical-json so reordering claims changes the # hash. Pointer ids are run-dependent — we prefer the # content-addressed evidence_ids here for run-stable provenance. parsed_lattice_hash = _sha256_hex( _canonical_json(parsed_lattice or []) ) rendered_hash = _sha256_hex(rendered) nodes = [ {"stage": "question", "hash": question_hash}, {"stage": "retrieval", "hash": retrieval_hash}, {"stage": "evidence_map", "hash": evidence_map_root}, {"stage": "prompt", "hash": conversation_hash}, {"stage": "raw_answer", "hash": raw_answer_hash}, {"stage": "parsed_claim_lattice", "hash": parsed_lattice_hash}, {"stage": "verify", "hash": verify_hash}, {"stage": "render", "hash": rendered_hash}, {"stage": "final_label", "hash": final_label_hash}, ] # Ticket #000009 — preflight stage binding. When supplied, # insert ``preflight`` between ``question`` and ``retrieval``. # Optional so legacy run_dag_root values from pre-#000009 code # remain reproducible (None → original 7/9-stage shape). The # preflight_hash bundles #000008 quantifier output, #000010 # QuestionState, AND the policy decisions taken on this run # — see preflight_node_hash() for the canonical payload. if preflight_hash is not None: nodes.insert( 1, {"stage": "preflight", "hash": preflight_hash}, ) leaves = [bytes.fromhex(n["hash"]) for n in nodes] root_hex = MerkleTree.build(leaves).root.hex() out = {"root": root_hex, "nodes": nodes} # Ticket #000009 §7.2 — recoverable preflight payload. Storing # the canonical dict alongside the leaf hash means # `arborist providence --show-preflight` can render the full # 5-clause CTI contract (classifier / answer_contract / # prompt_contract / evidence_contract / policy_refs + # question_state) from `run_dag_blob` without needing a # separate column or re-running the classifier. Audit replay # CAN re-verify the hash matches: # _sha256_hex(_canonical_json(preflight_payload)) == preflight_hash # (caller-side check; verify_run_dag does not enforce because # the hash is in `nodes` and the payload is sidecar data.) if preflight_payload is not None: out["preflight_payload"] = preflight_payload return out
[docs] def build_reject_run_dag( *, question_hash: str, preflight_hash: str, rejection_reason: str, answer_text: str, audit_mode: str = "UNGROUNDED", verifier_method: str = "claim_lattice_pointer", violations: list[dict] | None = None, preflight_payload: dict | None = None, ) -> dict: """3-stage reject-broad run-DAG: ``question → preflight → final_label``. Ticket #000009 §8.2 / 2026-05-04 feedback §6.2: preflight rejection currently early-returns from ``query()`` before the standard ``build_run_dag()`` runs, so reject rows have no auditable Merkle commitment. This builder fills that gap with a minimal DAG shape that captures the rejection without pretending retrieval / prompt / raw_model_output happened. The returned shape is INTENTIONALLY shorter than the standard 7/9/8/10-stage shapes — `audit replay can read the stage list` and tell instantly that this row is a preflight rejection: 3 stages always means reject path. `final_label` carries the rejection_reason + answer_text hash so two rejections that differ only in their (rendered) rationale string still produce different roots. The rejection_reason is the canonical string from the violation (`"preflight rejection — broad-quantifier query with unbounded scope. ..."`), NOT the operator-facing rendered answer_text — that lets policy template changes invalidate the hash even if the operator-visible text is unchanged. """ final_label_payload = { "audit_mode": audit_mode, "verifier_method": verifier_method, "lookup_path": "preflight", "rejection_reason": rejection_reason, "answer_text_hash": _sha256_hex(answer_text or ""), } if violations is not None: final_label_payload["violations"] = violations final_label_hash = _sha256_hex(_canonical_json(final_label_payload)) nodes = [ {"stage": "question", "hash": question_hash}, {"stage": "preflight", "hash": preflight_hash}, {"stage": "final_label", "hash": final_label_hash}, ] leaves = [bytes.fromhex(n["hash"]) for n in nodes] root_hex = MerkleTree.build(leaves).root.hex() out = {"root": root_hex, "nodes": nodes} if preflight_payload is not None: out["preflight_payload"] = preflight_payload return out
[docs] def verify_run_dag(blob: str | dict) -> bool: """Recompute the Merkle root from ``blob`` and check it matches. Used by audit tooling. Accepts either a parsed dict or the JSON string we persist in ``providence_cache.run_dag_blob``. """ if isinstance(blob, str): blob = json.loads(blob) nodes = blob.get("nodes") or [] if not nodes: return False leaves = [bytes.fromhex(n["hash"]) for n in nodes] return MerkleTree.build(leaves).root.hex() == blob.get("root")