"""Evidence map for claim-lattice-pointer (quote-by-pointer) answer mode.
Builds a deterministic table of evidence objects from already-retrieved
chunks. The model references each object by a short ``pointer_id``
("E1", "E2", ...) which the runtime maps back to a content-addressed
``evidence_id`` for the cache, run-DAG, and audit chain.
Design pinned by G0 ticket (2026-04-29) and the CTI / Clause Lattice
Intelligence reframe:
Models should not generate verbatim quotes.
They should point to evidence IDs extracted by deterministic code.
This kills the synthetic-elision class by construction — the model
never types the quote string, so it can't drop characters from one.
Two-layer id scheme:
- ``pointer_id`` short numeric tag the model sees in the prompt and
writes back in pointer-line answers. ``E`` + the
1-based position of the evidence object in the map
(``E1``, ``E2``, …, ``E37``). One BPE token per id
in standard tokenizers. Stays in the model's
in-distribution citation style.
- ``evidence_id`` content-addressed handle. ``E`` + first 8 hex of
``sha256(chunk_root:offset_start:offset_end)``.
Same chunk + same offsets in two runs → same id,
forever. The cache_key, run-DAG, and audit chain
all use this form so provenance is run-stable.
The verifier (``verify_claim_lattice``) maps each pointer_id the model
writes back to its content-addressed evidence_id before persistence.
The model's literal output is run-dependent (run #1's ``E1`` and run
#2's ``E1`` likely point at different chunks); the content-addressed
layer is what stays stable.
For the first cut every chunk produces exactly one evidence object
covering ``offset_start=0 .. offset_end=len(span)``. Sub-chunk
extraction (paragraph-level, sentence-level) is a future refinement;
the schema already accepts arbitrary offsets so adding finer-grained
splits later doesn't break the contract.
"""
from __future__ import annotations
import hashlib
import re
from dataclasses import asdict, dataclass
from arborist.merkle import MerkleTree
def _sha256_hex(s: str) -> str:
# ``errors='surrogatepass'`` for model-output text containing lone
# UTF-16 surrogates; same rationale as ``arborist.qa.dag._sha256_hex``.
return hashlib.sha256(s.encode("utf-8", errors="surrogatepass")).hexdigest()
[docs]
@dataclass(frozen=True)
class EvidenceObject:
"""One pinned span the model may reference by ``pointer_id``.
All fields are deterministic from inputs; same chunk + offsets yields
the same object byte-for-byte.
- ``pointer_id`` prompt-facing id ("E1", "E2", …) — what the
model sees & writes back. Position-derived;
run-dependent on purpose.
- ``evidence_id`` content-addressed handle ("E" + 8 hex of
sha256(chunk_root:start:end)) — what the
cache, run-DAG, and audit chain use.
Run-stable.
- ``source_root`` document_root the chunk belongs to
- ``document_uri`` human-readable URI (for the renderer)
- ``title`` doc title (for the renderer & prompt)
- ``chunk_idx`` chunk index within the document
- ``chunk_root`` leaf hash of the chunk
- ``offset_start`` byte offset within the chunk (0 for whole-chunk)
- ``offset_end`` end offset (exclusive)
- ``source_role`` role classification (primary_answer_source / ...)
- ``text_hash`` sha256 of the span (for tamper detection)
- ``span`` the literal text (what the renderer interpolates)
"""
pointer_id: str
evidence_id: str
source_root: str
document_uri: str
title: str | None
chunk_idx: int
chunk_root: str
offset_start: int
offset_end: int
source_role: str
text_hash: str
span: str
[docs]
def to_dict(self) -> dict:
return asdict(self)
def _evidence_id_for(chunk_root: str, offset_start: int, offset_end: int) -> str:
"""Content-addressed handle. Same chunk + offsets = same ID, forever."""
h = _sha256_hex(f"{chunk_root}:{offset_start}:{offset_end}")
return f"E{h[:8]}"
[docs]
def build_evidence_map(
chunks: list[dict],
) -> list[EvidenceObject]:
"""Build the evidence table from retrieved chunks.
``chunks`` is a list of dicts with keys:
source_root, document_uri, title (optional), chunk_idx,
chunk_root, span, source_role (optional, default 'unclassified')
Returns a list of ``EvidenceObject``s, one per chunk, in input order.
The 1-based position drives ``pointer_id`` (E1, E2, …); the chunk's
content drives ``evidence_id`` (sha256-derived). For the first cut
each chunk = one whole-span evidence object (offset 0 .. len(span)).
Sub-chunk splitting is a future refinement.
"""
out: list[EvidenceObject] = []
for i, c in enumerate(chunks):
span = c["span"]
offset_start = 0
offset_end = len(span)
chunk_root = c["chunk_root"]
evidence_id = _evidence_id_for(chunk_root, offset_start, offset_end)
out.append(
EvidenceObject(
pointer_id=f"E{i + 1}",
evidence_id=evidence_id,
source_root=c["source_root"],
document_uri=c["document_uri"],
title=c.get("title"),
chunk_idx=c["chunk_idx"],
chunk_root=chunk_root,
offset_start=offset_start,
offset_end=offset_end,
source_role=c.get("source_role", "unclassified"),
text_hash=_sha256_hex(span),
span=span,
)
)
return out
[docs]
def evidence_map_root(evidence: list[EvidenceObject]) -> str:
"""Merkle root over the sorted evidence_id leaves.
Sorting makes the root order-independent — two retrieval runs that
return the same chunks in different orders produce the same root.
Use as the ``evidence_map`` stage hash in the run-DAG.
"""
if not evidence:
return "00" * 32
leaves_hex = sorted(_sha256_hex(e.evidence_id) for e in evidence)
if len(leaves_hex) == 1:
return leaves_hex[0]
leaves = [bytes.fromhex(h) for h in leaves_hex]
return MerkleTree.build(leaves).root.hex()
[docs]
def render_evidence_block(e: EvidenceObject) -> str:
"""Format one evidence object for the LLM prompt.
Header carries the prompt-facing pointer_id, title (or URI tail),
and source role so the model has everything it needs to cite
without typing the span:
=== E1 (Jurassic_Park_(film) | primary_answer_source) ===
<literal span text>
The runtime maps E1 back to the content-addressed evidence_id
before persistence; the model never sees the hex form.
"""
label = (e.title or e.document_uri.rsplit("/", 1)[-1]) or "untitled"
return f"=== {e.pointer_id} ({label} | {e.source_role}) ===\n{e.span}"
[docs]
def render_evidence_map(evidence: list[EvidenceObject]) -> str:
"""Concatenated evidence blocks, ready to drop into the prompt."""
return "\n\n".join(render_evidence_block(e) for e in evidence)
[docs]
def render_evidence_block_for_json(e: EvidenceObject) -> str:
"""Format one evidence object for the JSON-mode LLM prompt.
2026-04-30: header uses the prompt-facing ``pointer_id`` (E1, E2,
…) — same as claim_lattice_pointer mode — instead of the
content-addressed ``evidence_id`` (long hex). The change closes a
real failure mode: small models (Hermes-3-8B observed) were
fabricating plausible-looking content-addressed IDs (e.g.
``E1b6e396`` when the runtime had ``Eed1b6e396``) →
``UNKNOWN_EVIDENCE_ID`` → UNGROUNDED, even when the answer text was
correct. Pointer IDs (``E1`` - ``E10``) are short, enumerable, and
fabrication-obvious — the model can't invent ``E27`` if only
``E1`` - ``E10`` were shown.
The runtime still stores content-addressed ``evidence_id`` in the
cache & run-DAG (resolved on-the-fly in ``verify_claim_lattice_json``);
only the prompt-facing string changes::
=== E1 (Jurassic_Park_(film) | primary_answer_source) ===
<literal span text>
"""
label = (e.title or e.document_uri.rsplit("/", 1)[-1]) or "untitled"
return f"=== {e.pointer_id} ({label} | {e.source_role}) ===\n{e.span}"
[docs]
def render_evidence_map_for_json(evidence: list[EvidenceObject]) -> str:
"""Concatenated evidence blocks for JSON mode."""
return "\n\n".join(render_evidence_block_for_json(e) for e in evidence)
[docs]
def evidence_map_by_pointer_id(
evidence: list[EvidenceObject],
) -> dict[str, EvidenceObject]:
"""Index by prompt-facing pointer_id (E1, E2, …)."""
return {e.pointer_id: e for e in evidence}
[docs]
def evidence_map_by_evidence_id(
evidence: list[EvidenceObject],
) -> dict[str, EvidenceObject]:
"""Index by content-addressed evidence_id (E1f8e4c2a, …)."""
return {e.evidence_id: e for e in evidence}
[docs]
def render_claim_lattice(
claims: list[dict],
by_id: dict[str, EvidenceObject],
*,
window: int = 200,
) -> str:
"""Convert structured claims to human-readable prose with literal spans.
Each claim becomes one bullet line followed by inlined evidence
excerpts. The model's claim text is rendered verbatim; each cited
pointer_id is followed by a **spotlight excerpt** of the literal
source span — a window of ``window`` chars centered on the first
content token from the claim that appears in the span. Falls back
to the leading window when no claim token matches.
Why spotlight over leading-N truncation: when the cited evidence is
a whole article and the model lazy-anchors every claim at the same
pointer, the leading-N strategy displayed the same article-intro
sentence under every claim. The spotlight finds the part of the
span the claim is *about* — "Brachiosaurus appears in the film" +
a 15 KB article span gets a window centered on the first
"brachiosaurus" mention, not the production-history opener. Same
cited evidence id, but the displayed text actually supports
different claims differently.
``by_id`` is the pointer_id → EvidenceObject index — what
``evidence_map_by_pointer_id`` returns. Determinism: same
(claims, by_id, window) → same prose, byte-for-byte. Unknown ids
render as ``[<id>: ?]`` so violations are visible at a glance.
"""
lines: list[str] = []
for c in claims:
text = (c.get("text") or "").strip()
if not text:
continue
lines.append(f"- {text}")
for eid in c.get("pointer_ids") or c.get("evidence_ids") or []:
obj = by_id.get(eid)
if obj is None:
lines.append(f' [{eid}: ?]')
continue
# Provenance-clear evidence pointer:
# [E5 | <source title> | <chunk_root prefix>: "<excerpt>"]
# Closes the visual confusion observed 2026-05-01 on the
# Orwell run where `[E5: "..."]` displayed alongside a
# source list whose `[5]` slot was a different document
# (E# is a chunk pointer, not a 1-indexed source rank).
# Title comes from the EvidenceObject (URI tail fallback);
# chunk_root prefix is the first 8 hex chars — enough for
# the operator to disambiguate while staying compact.
label = obj.title or obj.document_uri.rsplit("/", 1)[-1] or "untitled"
chunk_prefix = (obj.chunk_root or "")[:8]
excerpt = _spotlight_excerpt(text, obj.span, window=window)
lines.append(f' [{eid} | {label} | {chunk_prefix}: "{excerpt}"]')
return "\n".join(lines)
# Stopwords for spotlight content-token extraction. Smaller set than
# verify._ENGLISH_STOPWORDS — we only need to filter the words the
# model is most likely to share between claim text & every span. A
# handful of high-frequency 4+ char fillers is enough; the spotlight
# falls back to the leading window when no token matches anyway, so
# false negatives degrade gracefully.
_SPOTLIGHT_STOPWORDS = frozenset({
"from", "with", "into", "onto", "upon", "this", "that", "these",
"those", "have", "been", "being", "their", "there", "they",
"them", "your", "what", "when", "where", "while", "which",
"would", "could", "should", "might", "shall", "first", "last",
"also", "very", "much", "many", "more", "most", "less", "some",
"such", "than", "then", "back", "next", "after", "before",
"between", "through", "across", "above", "below", "during",
"without", "within", "until", "since", "about", "around",
"along", "among", "appear", "appears", "appeared", "feature",
"features", "include", "includes", "shown", "shows",
})
_TOKEN_PUNCT_STRIP_R = ".,;:!?\"'()[]{}—-"
def _content_tokens(text: str) -> list[str]:
"""Lowercase content tokens from ``text``, sorted by length desc.
Filters: drop ``< 4`` chars (function words) **unless** the token is
an all-caps 2-3-char acronym in the source text (``CPU``, ``GPU``,
``DNA``, ``FBI``, ``USB`` …) — those are high-signal topical anchors
despite being short, and dropping them is what made "what is a CPU?"
cited to "CPU design" trip ``TITLE_MISMATCH`` (the claim mentions
"CPU", the title mentions "CPU", but neither registered as a content
token). The deflection sidecar already uses a ≥3 floor for exactly
this reason. Also drop a small stopword set, dedup. Sorted
longest-first so the spotlight matches the most specific topical
token before generic ones — for "Brachiosaurus appears in the
film", that's ``brachiosaurus`` ahead of ``film``. (#000053)
"""
seen: set[str] = set()
out: list[str] = []
for raw in text.split():
core = raw.strip(_TOKEN_PUNCT_STRIP_R)
t = core.lower()
is_acronym = 2 <= len(core) <= 3 and core.isalpha() and core.isupper()
if (len(t) < 4 and not is_acronym) or t in _SPOTLIGHT_STOPWORDS or t in seen:
continue
seen.add(t)
out.append(t)
out.sort(key=len, reverse=True)
return out
_SENTENCE_END_RE = re.compile(r"[.!?][\"')\]]?\s+(?=[A-Z\"'(\[])|[.!?][\"')\]]?$")
def _expand_to_sentence_boundaries(span: str, start: int, end: int) -> tuple[int, int]:
"""Expand a (start, end) byte window in ``span`` outward to the
nearest sentence boundaries.
Boundary discovery uses a conservative regex: ``[.!?][\"')\\]]?``
followed by whitespace + capital letter (or end of span). The
result is byte-clean and idempotent — re-expanding an already-
sentence-bounded window returns the same indices.
Returns clamped ``(new_start, new_end)``. If the input is the
whole span or no sentence boundary is detectable in either
direction, the input is returned unchanged.
"""
if start <= 0 and end >= len(span):
return 0, len(span)
# Walk left from `start` to find the previous sentence boundary
# (or start of span). Use the regex's match positions in the
# text leading up to `start`.
new_start = 0
for m in _SENTENCE_END_RE.finditer(span, 0, start):
new_start = m.end()
# Walk right from `end` to find the next sentence boundary (or
# end of span). The first match whose START is >= end is the
# boundary that closes the spotlit sentence.
new_end = len(span)
for m in _SENTENCE_END_RE.finditer(span, end):
new_end = m.end()
break
return new_start, new_end
def _spotlight_excerpt(claim_text: str, span: str, *, window: int) -> str:
"""Return a sentence-bounded excerpt of ``span`` centered on the
first claim-token match.
Pre-2026-05-01 the excerpt was a fixed-width byte window with
leading/trailing ``"..."`` markers; that frequently cut mid-word
and produced excerpts like ``"...freewheeling plot about a boy
and a girl, and the many amazing creatures they have for friends
and p..."`` (the trailing ``p...`` is a half-word truncation). The
new version finds the spotlight token by claim-content-token
match (same as before), then expands outward to the nearest
sentence boundaries — never cuts a word.
Behavior:
1. If ``span`` already fits in ``window``, return it unchanged.
2. Otherwise locate the first content-token match in ``span``.
3. Expand the match position to the surrounding sentence(s).
4. If the expanded sentence range fits within ``2 * window`` bytes
(a soft budget — sentences carry meaning intact), return it
with leading/trailing ellipsis only when the range doesn't
reach the span boundaries.
5. If the expanded range exceeds ``2 * window``, fall back to the
window-centered approach but EXPAND the window's edges to the
nearest WORD boundaries (`\\b`-equivalent) so we never cut a
word.
6. If no claim token matches anywhere, return the leading window
expanded to the nearest sentence end.
Determinism: same (claim, span, window) → same byte-exact output.
"""
if len(span) <= window:
return span
span_lower = span.lower()
tokens = _content_tokens(claim_text)
# Find ALL match positions for ALL content tokens, then pick the
# position whose ±half-window cluster contains the most distinct
# tokens. The Homer-Simpson-boss case (2026-05-01) showed why
# first-match-of-longest-token loses: "homer" matches early in
# voice-actor prose, but the actual answer ("Mr. Burns") is
# deeper in the chunk where boss/burns/homer all cluster
# together. Density picks the load-bearing slice; first-match
# picks whatever phrasing the chunk happens to lead with.
half = window // 2
positions: list[tuple[str, int]] = []
for tok in tokens:
i = 0
while True:
j = span_lower.find(tok, i)
if j < 0:
break
positions.append((tok, j))
i = j + len(tok)
if not positions:
# No content match — return the leading sentence(s) up to
# ~window bytes, expanded to the next sentence end so we
# never cut a word at the boundary.
new_start, new_end = _expand_to_sentence_boundaries(span, 0, min(window, len(span)))
# Cap at 2× window so a runaway sentence doesn't blow the
# excerpt budget.
if new_end - new_start > 2 * window:
new_end = _word_boundary_before(span, new_start + 2 * window)
suffix = "..." if new_end < len(span) else ""
return f"{span[new_start:new_end]}{suffix}"
# Density rank: for each candidate position, count distinct
# tokens whose match positions fall within ±half. Pick the
# position with max distinct count; tie-break on smallest idx
# (deterministic, reproducible).
best_score = -1
match_idx = positions[0][1]
for _tok, idx in positions:
distinct: set[str] = set()
lo, hi = idx - half, idx + half
for tok2, idx2 in positions:
if lo <= idx2 <= hi:
distinct.add(tok2)
score = len(distinct)
if score > best_score or (score == best_score and idx < match_idx):
best_score = score
match_idx = idx
raw_start = max(0, match_idx - half)
raw_end = min(len(span), match_idx + len(tokens[0]) + half)
# Expand outward to sentence boundaries — this is the main change.
new_start, new_end = _expand_to_sentence_boundaries(span, raw_start, raw_end)
# Soft budget cap: if the expanded sentence range is more than
# 2× window, fall back to the windowed form but truncate at WORD
# boundaries so we don't cut a word.
if new_end - new_start > 2 * window:
new_start = _word_boundary_after(span, max(0, match_idx - half))
new_end = _word_boundary_before(span, min(len(span), new_start + 2 * window))
prefix = "..." if new_start > 0 else ""
suffix = "..." if new_end < len(span) else ""
return f"{prefix}{span[new_start:new_end].strip()}{suffix}"
def _word_boundary_after(span: str, idx: int) -> int:
"""Smallest ``i >= idx`` such that ``span[i-1]`` is whitespace or
``i == 0``. Walks forward to a clean word start."""
if idx <= 0:
return 0
while idx < len(span) and not span[idx - 1].isspace():
idx += 1
return idx
def _word_boundary_before(span: str, idx: int) -> int:
"""Largest ``i <= idx`` such that ``span[i]`` is whitespace or
``i == len(span)``. Walks backward to a clean word end."""
if idx >= len(span):
return len(span)
while idx > 0 and not span[idx].isspace():
idx -= 1
return idx