"""Per-run Merkle-DAG provenance for providence records.
Each query/ask call passes through several stages:
question → retrieval → context → prompt → answer → verify → final_label
Each stage emits a hash; the run's identity is the Merkle root over the
ordered sequence of stage hashes. Stored on the providence record as
``run_dag_root`` (alongside ``cache_key``). The DAG is verifiable: given
the persisted node list & the same Merkle conventions arborist uses
elsewhere (non-commutative HashCombine, prefix 0x03, leaf prefix 0x00,
self-duplicate odd rule), an auditor can recompute the root from the
nodes & confirm the run was constructed as recorded.
Distinct from the linear ``audit_events`` chain — that chain tracks
state-changing operations across the DB. This DAG tracks the
computation provenance of one specific answer. Both coexist; the
record's ``audit_event_hash`` links to the chain, ``run_dag_root`` &
``run_dag_blob`` carry the per-run computation graph.
Stages chosen to mirror the toy-Hermes design (fox 2026-04-30)::
question hash of question_hash (8-dim cache_key dim)
retrieval hash of sources summary (document_roots + roles +
scores) -- captures which docs ranked & how
context context_root (Merkle root over sorted source roots,
the "source" dim of the cache_key)
prompt conversation_hash (the assembled messages)
answer sha256(answer_text)
verify hash of verdict summary (audit_mode, verifier_method,
n_quotes, n_verified, claim_statuses)
final_label hash of (audit_mode, verifier_method, lookup_path)
The DAG is NOT part of cache_key. cache_key inputs (the 8 dims)
determine the answer; the answer determines the DAG. Folding the DAG
back into cache_key would create a circular dependency.
"""
from __future__ import annotations
import hashlib
import json
from arborist.merkle import MerkleTree
def _sha256_hex(s: str) -> str:
# ``errors='surrogatepass'`` lets lone UTF-16 surrogates through as
# their WTF-8 form. Hermes occasionally emits text with unpaired
# surrogates inside multi-byte sequences; bare ``.encode('utf-8')``
# raises UnicodeEncodeError on those, which previously aborted the
# run with no Merkle root. The hash stays deterministic because the
# WTF-8 byte sequence is reversible & unique per input.
return hashlib.sha256(s.encode("utf-8", errors="surrogatepass")).hexdigest()
def _canonical_json(obj) -> str:
return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False)
[docs]
def localize_failure(
*,
audit_mode: str,
n_sources: int,
n_quotes: int,
n_verified: int,
) -> str | None:
"""Map a non-STRICT verdict to the pipeline stage that introduced
the failure. Returns ``None`` for STRICT outcomes.
Stage labels (in pipeline order):
- ``retrieval`` — no admitted sources. Title/body gates rejected
everything, or the corpus genuinely lacks the topic. Repair path:
ingest more sources or relax the breadth threshold.
- ``context`` — sources admitted but no quotes extracted. Could be
a context-truncation issue (per-source cap dropped the relevant
paragraph) or a model that declined to cite anything. Repair path:
raise per-source cap; tighten prompt.
- ``answer`` — sources retrieved & quotes extracted but they don't
verify. The model either fabricated content, paraphrased inside
quotes, or appended citation tails. Repair path: the
``mechanical_repair`` pass + (when wired) the re-prompt feedback
loop.
The toy-Hermes design pass calls this "chain-segment failure
localization" — debugging becomes typed instead of vague. An
operator reading ``failure_stage='answer'`` knows retrieval &
context were fine; the model is what to fix. ``failure_stage='retrieval'``
means stop tuning the verifier & go ingest a relevant source.
"""
if audit_mode == "STRICT":
return None
if n_sources == 0:
return "retrieval"
if n_quotes == 0:
return "context"
# Quotes were extracted but didn't all verify (or none did).
return "answer"
PREFLIGHT_NODE_VERSION = "preflight-node-v1"
[docs]
def build_preflight_node_payload(
*,
question_state: dict | None = None,
quantifier: dict | None = None,
answer_contract: dict | None = None,
prompt_contract: dict | None = None,
evidence_contract: dict | None = None,
policy_refs: dict | None = None,
) -> dict:
"""Build the canonical nested-clause payload for the preflight
DAG stage. Returns a JSON-ready dict; pair with
:func:`preflight_node_hash` to get the SHA-256 hex.
Five-clause structure per ticket #000009 §8.2 / feedback §3:
- ``classifier`` — quantifier classifier output (#000008):
intensity, matched_token, explicit_count, scope_bound_hint,
is_broad, classifier_version, operational_shape.
- ``answer_contract`` — guard / cap / reject decisions taken
on this run.
- ``prompt_contract`` — reminder enabled / injected /
template_id (#000008 §10.5).
- ``evidence_contract`` — exposure budget, one-claim-per-line
discipline (#000010 §10.4).
- ``policy_refs`` — governance_policy_hash + model_profile_hash
+ answer_mode. Reference-by-hash rather than raw policy
bundles (feedback §4: avoid double-committing
already-hashed state).
Plus the metacog ``question_state`` from #000010 — that's its
own clause for now (logical_statuses, false_premise_hints,
contradiction_pairs). It's hashed separately by
`metacognition.preflight_policy_hash` already.
Any clause may be None / empty — the resulting payload is
still stable. Includes ``node_version`` so legacy runs without
the node can be unambiguously labeled `unavailable_legacy_run`
by audit tools.
"""
return {
"stage": "preflight",
"node_version": PREFLIGHT_NODE_VERSION,
"classifier": dict(quantifier) if quantifier else {},
"answer_contract": dict(answer_contract) if answer_contract else {},
"prompt_contract": dict(prompt_contract) if prompt_contract else {},
"evidence_contract": dict(evidence_contract) if evidence_contract else {},
"policy_refs": dict(policy_refs) if policy_refs else {},
# Metacognition QuestionState carries
# ``preflight_policy_hash`` internally so flipping a metacog
# detector invalidates this clause via that field. Stored
# nested so audit-replay can read all metacog signal in one
# place without descending into the quantifier classifier.
"question_state": dict(question_state) if question_state else {},
}
[docs]
def preflight_node_hash(
*,
question_state: dict | None = None,
quantifier: dict | None = None,
answer_contract: dict | None = None,
prompt_contract: dict | None = None,
evidence_contract: dict | None = None,
policy_refs: dict | None = None,
) -> str:
"""Hash the preflight decision into a stable SHA-256 hex string.
Returns the hash of the nested-clause payload built by
:func:`build_preflight_node_payload`. See that function for the
five-clause structure.
Audit-replay payoff: two cache rows that share the same
question + same model output + same verifier verdict but
different preflight policy state produce different hashes
here, which propagate to ``run_dag_root`` via
:func:`build_run_dag`.
Backward compatibility note: Pre-2026-05-04 (`c36e85c`) callers
used a flat 3-key payload (`question_state` / `quantifier` /
`policy_state`). Hashes computed with that callsite will NOT
match this restructured callsite — `run_dag_root` values for
rows written between `c36e85c` and the current commit are
treated as a discrete generation; they're still verifiable by
re-reading `run_dag_blob` (the persisted blob captures the
payload that was actually hashed).
"""
payload = build_preflight_node_payload(
question_state=question_state,
quantifier=quantifier,
answer_contract=answer_contract,
prompt_contract=prompt_contract,
evidence_contract=evidence_contract,
policy_refs=policy_refs,
)
return _sha256_hex(_canonical_json(payload))
[docs]
def build_run_dag(
*,
question_hash: str,
sources: list[dict],
context_root: str,
conversation_hash: str,
answer_text: str,
audit_mode: str,
verifier_method: str,
n_quotes: int,
n_verified: int,
claim_statuses: list[dict] | None = None,
lookup_path: str | None = None,
evidence_map_root: str | None = None,
answer_mode: str | None = None,
violations: list[dict] | None = None,
raw_answer_text: str | None = None,
parsed_lattice: list | None = None,
rendered_text: str | None = None,
retrieval_plan_hash: str | None = None,
preflight_hash: str | None = None,
preflight_payload: dict | None = None,
) -> dict:
"""Return ``{"root": <hex>, "nodes": [<stage>, <hash>], ...}``.
All inputs are already-computed hashes or text; no I/O. Idempotent &
deterministic — same inputs always produce the same root, byte-for-
byte across machines (as long as the Merkle conventions stay pinned;
they do, via ``arborist.merkle``).
Two base DAG shapes; both gain an optional ``preflight`` stage
when ``preflight_hash`` is supplied (Ticket #000009):
- **Quote mode (default).** 7 stages —
``question / retrieval / context / prompt / answer / verify /
final_label``. Triggered when ``evidence_map_root`` is None.
Backward-compatible with all run_dag_root values written by code
that pre-dates G0. With ``preflight_hash``, becomes 8 stages —
``question / preflight / retrieval / ...``.
- **Claim-lattice-pointer mode (G0 / CTI).** 9 stages —
``question / retrieval / evidence_map / prompt / raw_answer /
parsed_claim_lattice / verify / render / final_label``. Triggered
when ``evidence_map_root`` is non-None. Splits the single
``answer`` node into three: the model's raw output, the parsed
claim-lattice, and the rendered prose with literal spans
interpolated. ``context`` drops out (the context IS the evidence
map). All three of ``raw_answer_text`` / ``parsed_lattice`` /
``rendered_text`` should be supplied; missing args fall back to
``answer_text`` for the raw_answer & render hashes and ``[]`` for
the parsed_lattice hash. With ``preflight_hash``, becomes 10
stages.
``answer_mode`` & ``violations`` fold into the verify & final_label
payloads when provided. ``preflight_hash`` (Ticket #000009) is
optional; when None, the DAG shape remains 7/9 stages exactly so
pre-#000009 records can be re-validated. When supplied, the
preflight stage inserts at position 1 (between ``question`` and
``retrieval``) per ticket #000009 §3.1.
"""
sources_summary = [
{
"document_root": s.get("document_root"),
"source_role": s.get("source_role"),
"score": s.get("score"),
"chunk_idx": s.get("chunk_idx"),
}
for s in sources
]
sources_summary_hash = _sha256_hex(_canonical_json(sources_summary))
# Retrieval stage hash: when a retrieval_plan_hash is supplied
# (per ticket #000001 — provenance binding for operator-influenced
# retrieval inputs like keywords / top_k / over_fetch), the stage
# hash binds BOTH the plan (input) and the sources_summary
# (output). Without a plan supplied, fall back to the historical
# sources-summary-only hash so pre-#000001 records keep their
# run_dag_root values stable. Greenfield records that omit the
# plan stay readable by the run-DAG validator.
if retrieval_plan_hash is not None:
retrieval_hash = _sha256_hex(_canonical_json({
"retrieval_plan_hash": retrieval_plan_hash,
"sources_summary_hash": sources_summary_hash,
}))
else:
retrieval_hash = sources_summary_hash
answer_hash = _sha256_hex(answer_text)
failure_stage = localize_failure(
audit_mode=audit_mode,
n_sources=len(sources),
n_quotes=n_quotes,
n_verified=n_verified,
)
verify_payload = {
"audit_mode": audit_mode,
"verifier_method": verifier_method,
"n_quotes": n_quotes,
"n_verified": n_verified,
"claim_statuses": claim_statuses or [],
"failure_stage": failure_stage,
}
if violations is not None:
verify_payload["violations"] = violations
verify_hash = _sha256_hex(_canonical_json(verify_payload))
final_label_payload = {
"audit_mode": audit_mode,
"verifier_method": verifier_method,
"lookup_path": lookup_path,
}
if answer_mode is not None:
final_label_payload["answer_mode"] = answer_mode
final_label_hash = _sha256_hex(_canonical_json(final_label_payload))
if evidence_map_root is None:
# Quote-mode 7-stage shape — backward-compatible.
nodes = [
{"stage": "question", "hash": question_hash},
{"stage": "retrieval", "hash": retrieval_hash},
{"stage": "context", "hash": context_root},
{"stage": "prompt", "hash": conversation_hash},
{"stage": "answer", "hash": answer_hash},
{"stage": "verify", "hash": verify_hash},
{"stage": "final_label", "hash": final_label_hash},
]
else:
# Pointer-mode 9-stage shape (CTI). ``context`` drops out;
# ``answer`` splits into raw_answer / parsed_claim_lattice /
# render so each provenance step gets its own commitment.
raw_text = raw_answer_text if raw_answer_text is not None else answer_text
rendered = rendered_text if rendered_text is not None else answer_text
raw_answer_hash = _sha256_hex(raw_text)
# Parsed lattice = list of {claim_text, evidence_ids[]} dicts in
# input order; canonical-json so reordering claims changes the
# hash. Pointer ids are run-dependent — we prefer the
# content-addressed evidence_ids here for run-stable provenance.
parsed_lattice_hash = _sha256_hex(
_canonical_json(parsed_lattice or [])
)
rendered_hash = _sha256_hex(rendered)
nodes = [
{"stage": "question", "hash": question_hash},
{"stage": "retrieval", "hash": retrieval_hash},
{"stage": "evidence_map", "hash": evidence_map_root},
{"stage": "prompt", "hash": conversation_hash},
{"stage": "raw_answer", "hash": raw_answer_hash},
{"stage": "parsed_claim_lattice", "hash": parsed_lattice_hash},
{"stage": "verify", "hash": verify_hash},
{"stage": "render", "hash": rendered_hash},
{"stage": "final_label", "hash": final_label_hash},
]
# Ticket #000009 — preflight stage binding. When supplied,
# insert ``preflight`` between ``question`` and ``retrieval``.
# Optional so legacy run_dag_root values from pre-#000009 code
# remain reproducible (None → original 7/9-stage shape). The
# preflight_hash bundles #000008 quantifier output, #000010
# QuestionState, AND the policy decisions taken on this run
# — see preflight_node_hash() for the canonical payload.
if preflight_hash is not None:
nodes.insert(
1,
{"stage": "preflight", "hash": preflight_hash},
)
leaves = [bytes.fromhex(n["hash"]) for n in nodes]
root_hex = MerkleTree.build(leaves).root.hex()
out = {"root": root_hex, "nodes": nodes}
# Ticket #000009 §7.2 — recoverable preflight payload. Storing
# the canonical dict alongside the leaf hash means
# `arborist providence --show-preflight` can render the full
# 5-clause CTI contract (classifier / answer_contract /
# prompt_contract / evidence_contract / policy_refs +
# question_state) from `run_dag_blob` without needing a
# separate column or re-running the classifier. Audit replay
# CAN re-verify the hash matches:
# _sha256_hex(_canonical_json(preflight_payload)) == preflight_hash
# (caller-side check; verify_run_dag does not enforce because
# the hash is in `nodes` and the payload is sidecar data.)
if preflight_payload is not None:
out["preflight_payload"] = preflight_payload
return out
[docs]
def build_reject_run_dag(
*,
question_hash: str,
preflight_hash: str,
rejection_reason: str,
answer_text: str,
audit_mode: str = "UNGROUNDED",
verifier_method: str = "claim_lattice_pointer",
violations: list[dict] | None = None,
preflight_payload: dict | None = None,
) -> dict:
"""3-stage reject-broad run-DAG: ``question → preflight →
final_label``.
Ticket #000009 §8.2 / 2026-05-04 feedback §6.2: preflight
rejection currently early-returns from ``query()`` before the
standard ``build_run_dag()`` runs, so reject rows have no
auditable Merkle commitment. This builder fills that gap with
a minimal DAG shape that captures the rejection without
pretending retrieval / prompt / raw_model_output happened.
The returned shape is INTENTIONALLY shorter than the standard
7/9/8/10-stage shapes — `audit replay can read the stage
list` and tell instantly that this row is a preflight
rejection: 3 stages always means reject path.
`final_label` carries the rejection_reason + answer_text hash
so two rejections that differ only in their (rendered)
rationale string still produce different roots. The
rejection_reason is the canonical string from the violation
(`"preflight rejection — broad-quantifier query with
unbounded scope. ..."`), NOT the operator-facing rendered
answer_text — that lets policy template changes invalidate
the hash even if the operator-visible text is unchanged.
"""
final_label_payload = {
"audit_mode": audit_mode,
"verifier_method": verifier_method,
"lookup_path": "preflight",
"rejection_reason": rejection_reason,
"answer_text_hash": _sha256_hex(answer_text or ""),
}
if violations is not None:
final_label_payload["violations"] = violations
final_label_hash = _sha256_hex(_canonical_json(final_label_payload))
nodes = [
{"stage": "question", "hash": question_hash},
{"stage": "preflight", "hash": preflight_hash},
{"stage": "final_label", "hash": final_label_hash},
]
leaves = [bytes.fromhex(n["hash"]) for n in nodes]
root_hex = MerkleTree.build(leaves).root.hex()
out = {"root": root_hex, "nodes": nodes}
if preflight_payload is not None:
out["preflight_payload"] = preflight_payload
return out
[docs]
def verify_run_dag(blob: str | dict) -> bool:
"""Recompute the Merkle root from ``blob`` and check it matches.
Used by audit tooling. Accepts either a parsed dict or the JSON
string we persist in ``providence_cache.run_dag_blob``.
"""
if isinstance(blob, str):
blob = json.loads(blob)
nodes = blob.get("nodes") or []
if not nodes:
return False
leaves = [bytes.fromhex(n["hash"]) for n in nodes]
return MerkleTree.build(leaves).root.hex() == blob.get("root")