Source code for arborist.qa.evidence

"""Evidence map for claim-lattice-pointer (quote-by-pointer) answer mode.

Builds a deterministic table of evidence objects from already-retrieved
chunks. The model references each object by a short ``pointer_id``
("E1", "E2", ...) which the runtime maps back to a content-addressed
``evidence_id`` for the cache, run-DAG, and audit chain.

Design pinned by G0 ticket (2026-04-29) and the CTI / Clause Lattice
Intelligence reframe:

    Models should not generate verbatim quotes.
    They should point to evidence IDs extracted by deterministic code.

This kills the synthetic-elision class by construction — the model
never types the quote string, so it can't drop characters from one.

Two-layer id scheme:

- ``pointer_id``    short numeric tag the model sees in the prompt and
                     writes back in pointer-line answers. ``E`` + the
                     1-based position of the evidence object in the map
                     (``E1``, ``E2``, …, ``E37``). One BPE token per id
                     in standard tokenizers. Stays in the model's
                     in-distribution citation style.
- ``evidence_id``   content-addressed handle. ``E`` + first 8 hex of
                     ``sha256(chunk_root:offset_start:offset_end)``.
                     Same chunk + same offsets in two runs → same id,
                     forever. The cache_key, run-DAG, and audit chain
                     all use this form so provenance is run-stable.

The verifier (``verify_claim_lattice``) maps each pointer_id the model
writes back to its content-addressed evidence_id before persistence.
The model's literal output is run-dependent (run #1's ``E1`` and run
#2's ``E1`` likely point at different chunks); the content-addressed
layer is what stays stable.

For the first cut every chunk produces exactly one evidence object
covering ``offset_start=0 .. offset_end=len(span)``. Sub-chunk
extraction (paragraph-level, sentence-level) is a future refinement;
the schema already accepts arbitrary offsets so adding finer-grained
splits later doesn't break the contract.
"""

from __future__ import annotations

import hashlib
import re
from dataclasses import asdict, dataclass

from arborist.merkle import MerkleTree


def _sha256_hex(s: str) -> str:
    # ``errors='surrogatepass'`` for model-output text containing lone
    # UTF-16 surrogates; same rationale as ``arborist.qa.dag._sha256_hex``.
    return hashlib.sha256(s.encode("utf-8", errors="surrogatepass")).hexdigest()


[docs] @dataclass(frozen=True) class EvidenceObject: """One pinned span the model may reference by ``pointer_id``. All fields are deterministic from inputs; same chunk + offsets yields the same object byte-for-byte. - ``pointer_id`` prompt-facing id ("E1", "E2", …) — what the model sees & writes back. Position-derived; run-dependent on purpose. - ``evidence_id`` content-addressed handle ("E" + 8 hex of sha256(chunk_root:start:end)) — what the cache, run-DAG, and audit chain use. Run-stable. - ``source_root`` document_root the chunk belongs to - ``document_uri`` human-readable URI (for the renderer) - ``title`` doc title (for the renderer & prompt) - ``chunk_idx`` chunk index within the document - ``chunk_root`` leaf hash of the chunk - ``offset_start`` byte offset within the chunk (0 for whole-chunk) - ``offset_end`` end offset (exclusive) - ``source_role`` role classification (primary_answer_source / ...) - ``text_hash`` sha256 of the span (for tamper detection) - ``span`` the literal text (what the renderer interpolates) """ pointer_id: str evidence_id: str source_root: str document_uri: str title: str | None chunk_idx: int chunk_root: str offset_start: int offset_end: int source_role: str text_hash: str span: str
[docs] def to_dict(self) -> dict: return asdict(self)
def _evidence_id_for(chunk_root: str, offset_start: int, offset_end: int) -> str: """Content-addressed handle. Same chunk + offsets = same ID, forever.""" h = _sha256_hex(f"{chunk_root}:{offset_start}:{offset_end}") return f"E{h[:8]}"
[docs] def build_evidence_map( chunks: list[dict], ) -> list[EvidenceObject]: """Build the evidence table from retrieved chunks. ``chunks`` is a list of dicts with keys: source_root, document_uri, title (optional), chunk_idx, chunk_root, span, source_role (optional, default 'unclassified') Returns a list of ``EvidenceObject``s, one per chunk, in input order. The 1-based position drives ``pointer_id`` (E1, E2, …); the chunk's content drives ``evidence_id`` (sha256-derived). For the first cut each chunk = one whole-span evidence object (offset 0 .. len(span)). Sub-chunk splitting is a future refinement. """ out: list[EvidenceObject] = [] for i, c in enumerate(chunks): span = c["span"] offset_start = 0 offset_end = len(span) chunk_root = c["chunk_root"] evidence_id = _evidence_id_for(chunk_root, offset_start, offset_end) out.append( EvidenceObject( pointer_id=f"E{i + 1}", evidence_id=evidence_id, source_root=c["source_root"], document_uri=c["document_uri"], title=c.get("title"), chunk_idx=c["chunk_idx"], chunk_root=chunk_root, offset_start=offset_start, offset_end=offset_end, source_role=c.get("source_role", "unclassified"), text_hash=_sha256_hex(span), span=span, ) ) return out
[docs] def evidence_map_root(evidence: list[EvidenceObject]) -> str: """Merkle root over the sorted evidence_id leaves. Sorting makes the root order-independent — two retrieval runs that return the same chunks in different orders produce the same root. Use as the ``evidence_map`` stage hash in the run-DAG. """ if not evidence: return "00" * 32 leaves_hex = sorted(_sha256_hex(e.evidence_id) for e in evidence) if len(leaves_hex) == 1: return leaves_hex[0] leaves = [bytes.fromhex(h) for h in leaves_hex] return MerkleTree.build(leaves).root.hex()
[docs] def render_evidence_block(e: EvidenceObject) -> str: """Format one evidence object for the LLM prompt. Header carries the prompt-facing pointer_id, title (or URI tail), and source role so the model has everything it needs to cite without typing the span: === E1 (Jurassic_Park_(film) | primary_answer_source) === <literal span text> The runtime maps E1 back to the content-addressed evidence_id before persistence; the model never sees the hex form. """ label = (e.title or e.document_uri.rsplit("/", 1)[-1]) or "untitled" return f"=== {e.pointer_id} ({label} | {e.source_role}) ===\n{e.span}"
[docs] def render_evidence_map(evidence: list[EvidenceObject]) -> str: """Concatenated evidence blocks, ready to drop into the prompt.""" return "\n\n".join(render_evidence_block(e) for e in evidence)
[docs] def render_evidence_block_for_json(e: EvidenceObject) -> str: """Format one evidence object for the JSON-mode LLM prompt. 2026-04-30: header uses the prompt-facing ``pointer_id`` (E1, E2, …) — same as claim_lattice_pointer mode — instead of the content-addressed ``evidence_id`` (long hex). The change closes a real failure mode: small models (Hermes-3-8B observed) were fabricating plausible-looking content-addressed IDs (e.g. ``E1b6e396`` when the runtime had ``Eed1b6e396``) → ``UNKNOWN_EVIDENCE_ID`` → UNGROUNDED, even when the answer text was correct. Pointer IDs (``E1`` - ``E10``) are short, enumerable, and fabrication-obvious — the model can't invent ``E27`` if only ``E1`` - ``E10`` were shown. The runtime still stores content-addressed ``evidence_id`` in the cache & run-DAG (resolved on-the-fly in ``verify_claim_lattice_json``); only the prompt-facing string changes:: === E1 (Jurassic_Park_(film) | primary_answer_source) === <literal span text> """ label = (e.title or e.document_uri.rsplit("/", 1)[-1]) or "untitled" return f"=== {e.pointer_id} ({label} | {e.source_role}) ===\n{e.span}"
[docs] def render_evidence_map_for_json(evidence: list[EvidenceObject]) -> str: """Concatenated evidence blocks for JSON mode.""" return "\n\n".join(render_evidence_block_for_json(e) for e in evidence)
[docs] def evidence_map_by_pointer_id( evidence: list[EvidenceObject], ) -> dict[str, EvidenceObject]: """Index by prompt-facing pointer_id (E1, E2, …).""" return {e.pointer_id: e for e in evidence}
[docs] def evidence_map_by_evidence_id( evidence: list[EvidenceObject], ) -> dict[str, EvidenceObject]: """Index by content-addressed evidence_id (E1f8e4c2a, …).""" return {e.evidence_id: e for e in evidence}
[docs] def render_claim_lattice( claims: list[dict], by_id: dict[str, EvidenceObject], *, window: int = 200, ) -> str: """Convert structured claims to human-readable prose with literal spans. Each claim becomes one bullet line followed by inlined evidence excerpts. The model's claim text is rendered verbatim; each cited pointer_id is followed by a **spotlight excerpt** of the literal source span — a window of ``window`` chars centered on the first content token from the claim that appears in the span. Falls back to the leading window when no claim token matches. Why spotlight over leading-N truncation: when the cited evidence is a whole article and the model lazy-anchors every claim at the same pointer, the leading-N strategy displayed the same article-intro sentence under every claim. The spotlight finds the part of the span the claim is *about* — "Brachiosaurus appears in the film" + a 15 KB article span gets a window centered on the first "brachiosaurus" mention, not the production-history opener. Same cited evidence id, but the displayed text actually supports different claims differently. ``by_id`` is the pointer_id → EvidenceObject index — what ``evidence_map_by_pointer_id`` returns. Determinism: same (claims, by_id, window) → same prose, byte-for-byte. Unknown ids render as ``[<id>: ?]`` so violations are visible at a glance. """ lines: list[str] = [] for c in claims: text = (c.get("text") or "").strip() if not text: continue lines.append(f"- {text}") for eid in c.get("pointer_ids") or c.get("evidence_ids") or []: obj = by_id.get(eid) if obj is None: lines.append(f' [{eid}: ?]') continue # Provenance-clear evidence pointer: # [E5 | <source title> | <chunk_root prefix>: "<excerpt>"] # Closes the visual confusion observed 2026-05-01 on the # Orwell run where `[E5: "..."]` displayed alongside a # source list whose `[5]` slot was a different document # (E# is a chunk pointer, not a 1-indexed source rank). # Title comes from the EvidenceObject (URI tail fallback); # chunk_root prefix is the first 8 hex chars — enough for # the operator to disambiguate while staying compact. label = obj.title or obj.document_uri.rsplit("/", 1)[-1] or "untitled" chunk_prefix = (obj.chunk_root or "")[:8] excerpt = _spotlight_excerpt(text, obj.span, window=window) lines.append(f' [{eid} | {label} | {chunk_prefix}: "{excerpt}"]') return "\n".join(lines)
# Stopwords for spotlight content-token extraction. Smaller set than # verify._ENGLISH_STOPWORDS — we only need to filter the words the # model is most likely to share between claim text & every span. A # handful of high-frequency 4+ char fillers is enough; the spotlight # falls back to the leading window when no token matches anyway, so # false negatives degrade gracefully. _SPOTLIGHT_STOPWORDS = frozenset({ "from", "with", "into", "onto", "upon", "this", "that", "these", "those", "have", "been", "being", "their", "there", "they", "them", "your", "what", "when", "where", "while", "which", "would", "could", "should", "might", "shall", "first", "last", "also", "very", "much", "many", "more", "most", "less", "some", "such", "than", "then", "back", "next", "after", "before", "between", "through", "across", "above", "below", "during", "without", "within", "until", "since", "about", "around", "along", "among", "appear", "appears", "appeared", "feature", "features", "include", "includes", "shown", "shows", }) _TOKEN_PUNCT_STRIP_R = ".,;:!?\"'()[]{}—-" def _content_tokens(text: str) -> list[str]: """Lowercase content tokens from ``text``, sorted by length desc. Filters: drop ``< 4`` chars (function words) **unless** the token is an all-caps 2-3-char acronym in the source text (``CPU``, ``GPU``, ``DNA``, ``FBI``, ``USB`` …) — those are high-signal topical anchors despite being short, and dropping them is what made "what is a CPU?" cited to "CPU design" trip ``TITLE_MISMATCH`` (the claim mentions "CPU", the title mentions "CPU", but neither registered as a content token). The deflection sidecar already uses a ≥3 floor for exactly this reason. Also drop a small stopword set, dedup. Sorted longest-first so the spotlight matches the most specific topical token before generic ones — for "Brachiosaurus appears in the film", that's ``brachiosaurus`` ahead of ``film``. (#000053) """ seen: set[str] = set() out: list[str] = [] for raw in text.split(): core = raw.strip(_TOKEN_PUNCT_STRIP_R) t = core.lower() is_acronym = 2 <= len(core) <= 3 and core.isalpha() and core.isupper() if (len(t) < 4 and not is_acronym) or t in _SPOTLIGHT_STOPWORDS or t in seen: continue seen.add(t) out.append(t) out.sort(key=len, reverse=True) return out _SENTENCE_END_RE = re.compile(r"[.!?][\"')\]]?\s+(?=[A-Z\"'(\[])|[.!?][\"')\]]?$") def _expand_to_sentence_boundaries(span: str, start: int, end: int) -> tuple[int, int]: """Expand a (start, end) byte window in ``span`` outward to the nearest sentence boundaries. Boundary discovery uses a conservative regex: ``[.!?][\"')\\]]?`` followed by whitespace + capital letter (or end of span). The result is byte-clean and idempotent — re-expanding an already- sentence-bounded window returns the same indices. Returns clamped ``(new_start, new_end)``. If the input is the whole span or no sentence boundary is detectable in either direction, the input is returned unchanged. """ if start <= 0 and end >= len(span): return 0, len(span) # Walk left from `start` to find the previous sentence boundary # (or start of span). Use the regex's match positions in the # text leading up to `start`. new_start = 0 for m in _SENTENCE_END_RE.finditer(span, 0, start): new_start = m.end() # Walk right from `end` to find the next sentence boundary (or # end of span). The first match whose START is >= end is the # boundary that closes the spotlit sentence. new_end = len(span) for m in _SENTENCE_END_RE.finditer(span, end): new_end = m.end() break return new_start, new_end def _spotlight_excerpt(claim_text: str, span: str, *, window: int) -> str: """Return a sentence-bounded excerpt of ``span`` centered on the first claim-token match. Pre-2026-05-01 the excerpt was a fixed-width byte window with leading/trailing ``"..."`` markers; that frequently cut mid-word and produced excerpts like ``"...freewheeling plot about a boy and a girl, and the many amazing creatures they have for friends and p..."`` (the trailing ``p...`` is a half-word truncation). The new version finds the spotlight token by claim-content-token match (same as before), then expands outward to the nearest sentence boundaries — never cuts a word. Behavior: 1. If ``span`` already fits in ``window``, return it unchanged. 2. Otherwise locate the first content-token match in ``span``. 3. Expand the match position to the surrounding sentence(s). 4. If the expanded sentence range fits within ``2 * window`` bytes (a soft budget — sentences carry meaning intact), return it with leading/trailing ellipsis only when the range doesn't reach the span boundaries. 5. If the expanded range exceeds ``2 * window``, fall back to the window-centered approach but EXPAND the window's edges to the nearest WORD boundaries (`\\b`-equivalent) so we never cut a word. 6. If no claim token matches anywhere, return the leading window expanded to the nearest sentence end. Determinism: same (claim, span, window) → same byte-exact output. """ if len(span) <= window: return span span_lower = span.lower() tokens = _content_tokens(claim_text) # Find ALL match positions for ALL content tokens, then pick the # position whose ±half-window cluster contains the most distinct # tokens. The Homer-Simpson-boss case (2026-05-01) showed why # first-match-of-longest-token loses: "homer" matches early in # voice-actor prose, but the actual answer ("Mr. Burns") is # deeper in the chunk where boss/burns/homer all cluster # together. Density picks the load-bearing slice; first-match # picks whatever phrasing the chunk happens to lead with. half = window // 2 positions: list[tuple[str, int]] = [] for tok in tokens: i = 0 while True: j = span_lower.find(tok, i) if j < 0: break positions.append((tok, j)) i = j + len(tok) if not positions: # No content match — return the leading sentence(s) up to # ~window bytes, expanded to the next sentence end so we # never cut a word at the boundary. new_start, new_end = _expand_to_sentence_boundaries(span, 0, min(window, len(span))) # Cap at 2× window so a runaway sentence doesn't blow the # excerpt budget. if new_end - new_start > 2 * window: new_end = _word_boundary_before(span, new_start + 2 * window) suffix = "..." if new_end < len(span) else "" return f"{span[new_start:new_end]}{suffix}" # Density rank: for each candidate position, count distinct # tokens whose match positions fall within ±half. Pick the # position with max distinct count; tie-break on smallest idx # (deterministic, reproducible). best_score = -1 match_idx = positions[0][1] for _tok, idx in positions: distinct: set[str] = set() lo, hi = idx - half, idx + half for tok2, idx2 in positions: if lo <= idx2 <= hi: distinct.add(tok2) score = len(distinct) if score > best_score or (score == best_score and idx < match_idx): best_score = score match_idx = idx raw_start = max(0, match_idx - half) raw_end = min(len(span), match_idx + len(tokens[0]) + half) # Expand outward to sentence boundaries — this is the main change. new_start, new_end = _expand_to_sentence_boundaries(span, raw_start, raw_end) # Soft budget cap: if the expanded sentence range is more than # 2× window, fall back to the windowed form but truncate at WORD # boundaries so we don't cut a word. if new_end - new_start > 2 * window: new_start = _word_boundary_after(span, max(0, match_idx - half)) new_end = _word_boundary_before(span, min(len(span), new_start + 2 * window)) prefix = "..." if new_start > 0 else "" suffix = "..." if new_end < len(span) else "" return f"{prefix}{span[new_start:new_end].strip()}{suffix}" def _word_boundary_after(span: str, idx: int) -> int: """Smallest ``i >= idx`` such that ``span[i-1]`` is whitespace or ``i == 0``. Walks forward to a clean word start.""" if idx <= 0: return 0 while idx < len(span) and not span[idx - 1].isspace(): idx += 1 return idx def _word_boundary_before(span: str, idx: int) -> int: """Largest ``i <= idx`` such that ``span[i]`` is whitespace or ``i == len(span)``. Walks backward to a clean word end.""" if idx >= len(span): return len(span) while idx > 0 and not span[idx].isspace(): idx -= 1 return idx