"""Multi-source corpus Q&A.
Pose a question, the tree finds related cached docs, assembles them as context,
asks Hermes, caches the answer.
The flow:
1. FTS5 search across all shards (chunks_fts can't be UNION'd in views,
so we query each shard's index independently and merge by score).
2. Title-boost rerank: hits whose title contains query tokens get a
score bump. Title is a strong topical signal that BM25 alone misses
(BM25 favors short docs with rare body tokens — Tell_(poker) outranks
Back_to_the_Future without a title boost).
3. Pick top-K distinct documents within a character budget.
4. Compute `context_root` = Merkle root over the sorted source
document_roots — that's the "source" dimension of v9.8's 8-dim
cache_key for this multi-source answer.
5. Cache lookup; hit returns the persisted audit_mode.
6. Miss calls Hermes via the OpenAI-compatible client, then runs the
faithfulness check (``verify_quotes``) — every double-quoted span
in the answer is verbatim-matched against the assembled context.
Result classifies the answer as STRICT (every quote >=1 verified
against context), HYBRID (some claims sourced, some emergent /
training-derived), or UNGROUNDED (no quotes verify — purely
emergent).
7. Persist record with merkle_proof = {context_root, sources: [...]},
audit_mode, and unverified_quotes (the spans the model produced
that didn't appear in any source — corpus-growth signal).
Per-source proofs are not bundled here (the source roots themselves are
already content-addressed). A verifier asks the shards for any specific
chunk's proof on demand.
"""
from __future__ import annotations
import json
import os
import re
import time
import unicodedata
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from pathlib import Path
from arborist import (
CANONICALIZATION_VERSION,
CHUNKING_VERSION,
SCHEMA_VERSION,
)
from arborist.compress import unpack_chunk
from arborist.merkle import MerkleTree
from arborist.qa.client import ChatClient
from arborist.qa.prompts import (
CLAIM_LATTICE_GROUNDING_REMINDER,
CLAIM_LATTICE_JSON_GROUNDING_REMINDER,
CLAIM_LATTICE_JSON_SYSTEM_PROMPT,
CLAIM_LATTICE_SYSTEM_PROMPT,
)
from arborist.qa.concepts import (
has_compare_phrasing,
rivalry_excluded,
synonym_expand,
synonym_expand_strict,
)
from arborist.qa.keys import (
DEFAULT_FIDELITY,
DEFAULT_QUESTION_DEDUP,
FIDELITY_MODES,
QUESTION_DEDUP_MODES,
cache_key,
canonical_question,
conversation_hash,
governance_policy_hash,
model_profile_hash,
question_hash,
verifier_policy_hash,
)
from arborist.qa.dag import build_run_dag
from arborist.qa.frame import FrameDetection, detect_frame as _detect_frame
from arborist.qa.progress import Progress, disabled as _progress_disabled
from arborist.qa.retrieval_plan import RetrievalPlan, retrieval_plan_hash
from arborist.qa.evidence import (
build_evidence_map,
evidence_map_root,
render_evidence_map,
render_evidence_map_for_json,
)
from arborist.qa.repair import mechanical_repair, reprompt_repair
from arborist.qa.verify import (
ANSWER_MODES,
CLAIM_LATTICE_JSON_SCHEMA,
verify_claim_lattice_json,
DEFAULT_ANSWER_MODE,
verify_claim_lattice,
verify_quotes,
)
try:
from arborist.wikitext import BASE_VERSION as _WIKITEXT_BASE_VERSION
from arborist.wikitext import to_base as _wikitext_to_base
except ImportError: # pragma: no cover
_WIKITEXT_BASE_VERSION = None
_wikitext_to_base = None
from arborist.search import FTS5Backend
from arborist.store import (
append_audit,
connect,
discover_shards,
transaction,
)
_TITLE_TOKEN_RE = re.compile(r"[A-Za-z][A-Za-z0-9]*")
# Hyphen-joined run of two-or-more word tokens. Used by
# `_hyphen_fold_variants` to emit joined-no-hyphen variants. See
# Ticket #000007 for the FTS5 hyphen-tokenization-asymmetry rationale:
# `Bipolar disorder` indexes as [bipolar], `Bi-Polar (album)` indexes
# as [bi, polar, ...]. Without the fold, query "bi-polar" hits only
# the album cluster.
_HYPHEN_RUN_RE = re.compile(
r"[A-Za-z][A-Za-z0-9]*(?:-[A-Za-z][A-Za-z0-9]*)+"
)
# Kept in sync with FTS5 stopwords in arborist.search.fts5 — both filter
# the same set of question-shaping words. "tell" leaking into title-LIKE
# search caused "tell me about permacomputer" to pull Tell_(poker), the
# Tell-Tale_Heart movie, Tell_City Indiana, etc.
_TITLE_STOPWORDS = frozenset(
"""
the a an is are was were be been being of to in on at for with by from
as about into through during and or but not no nor so yet too very also just
what who where when why how which this that these those such i you he she
it we they me him her us them do does did have has had can could should
would will may might
tell show describe explain summarize say give list find make please
all there know everything anything something
""".split()
)
def _hyphen_fold_variants(s: str) -> set[str]:
"""For each hyphen-joined run of word tokens in ``s``, emit the
joined-no-hyphen form. Lets retrieval reach indexed forms that
survived the FTS5 hyphen-split asymmetry (Ticket #000007):
"bi-polar is rare?" -> {"bipolar"}
"high-school co-op" -> {"highschool", "coop"}
"plain query" -> set()
Stopword and length filters mirror ``_title_query_tokens`` so a
junk fold like "of-the" -> "ofthe" never enters the candidate set.
"""
out: set[str] = set()
for run in _HYPHEN_RUN_RE.findall(s):
joined = run.replace("-", "").lower()
if len(joined) > 1 and joined not in _TITLE_STOPWORDS:
out.add(joined)
return out
# Numeral-fold (measured 2026-05-18): ordinal-word query ("Alexander
# the second") never reaches a Roman-numeral title ("Alexander II")
# because `second` != `ii`. Mined-fixture recall@8 = 22/40 = 55 %,
# 18 clean misses all this shape. Same additive+symmetric mechanism
# as `_hyphen_fold_variants` (#000007). Scope is honest: only
# multi-char Romans survive the universal `len > 1` token filter, so
# single-char Romans (I/V/X — "Charles V") are out of reach by token-
# fold alone (a deeper change; ~4 of the 18, separately measurable).
# Strict 2..40 set → no English-word collision ("did"/"mix"/"civil"
# are not Romans here); single-char forms intentionally absent.
_NUM_ORD_TO_ROMAN = {
"second": "ii", "third": "iii", "fourth": "iv", "sixth": "vi",
"seventh": "vii", "eighth": "viii", "ninth": "ix", "eleventh": "xi",
"twelfth": "xii", "thirteenth": "xiii", "fourteenth": "xiv",
"fifteenth": "xv", "sixteenth": "xvi", "seventeenth": "xvii",
"eighteenth": "xviii", "nineteenth": "xix", "twentieth": "xx",
}
_NUM_ROMAN_TO_ORD = {v: k for k, v in _NUM_ORD_TO_ROMAN.items()}
def _numeral_fold_variants(s: str) -> set[str]:
"""Symmetric ordinal-word <-> multi-char-Roman fold.
"who was Alexander the second?" -> {"ii"}
"Alexander II" -> {"second"}
"plain query" -> set()
Additive (mirrors `_hyphen_fold_variants`): callers union this
into the token set, so existing matches are preserved and a
folded query token can now also overlap a Roman-numeral title.
Stopword/length filtered like `_title_query_tokens`.
"""
out: set[str] = set()
for tok in _TITLE_TOKEN_RE.findall(s):
tl = tok.lower()
v = _NUM_ORD_TO_ROMAN.get(tl) or _NUM_ROMAN_TO_ORD.get(tl)
if v and len(v) > 1 and v not in _TITLE_STOPWORDS:
out.add(v)
return out
def _ascii_fold(s: str) -> str:
"""Strip combining diacritics: 'Béla Bartók' -> 'Bela Bartok'."""
return "".join(
ch for ch in unicodedata.normalize("NFKD", s)
if not unicodedata.combining(ch)
)
def _accent_fold_variants(s: str) -> set[str]:
"""ASCII-folded word variants for diacritic text.
"Béla Bartók" -> {"bela", "bartok"}
"what is X?" -> set() (already ASCII — no-op)
Why this is load-bearing, not cosmetic: `_TITLE_TOKEN_RE` is
`[A-Za-z]…`, so a diacritic title fragments ("Béla" -> "B","la")
and never matches the ASCII form a user types. Folding then
re-tokenising recovers the clean tokens. Additive+symmetric,
same discipline as `_hyphen_fold_variants` (#000007) /
`_numeral_fold_variants`: a pure-ASCII `s` folds to itself ->
empty -> zero effect on non-accent queries/titles. Measured
2026-05-18 (fold-search #1; 8.1% of corpus titles carry
diacritics).
"""
folded = _ascii_fold(s)
if folded == s:
return set()
return {
t.lower()
for t in _TITLE_TOKEN_RE.findall(folded)
if t.lower() not in _TITLE_STOPWORDS and len(t) > 1
}
# Honorific-fold: a user types "Mt/St/Dr Everest"; the title spells
# "Mount/Saint/Doctor Everest" (or vice versa). Measured 2026-05-18
# (mined ground-truth fixture, no fold): recall@1 only 45% / @8 62%,
# 15/40 misses — large headroom, no existing fold. Same additive+
# symmetric discipline as the numeral/accent folds. Bidirectional so
# either surface form reaches the other; strict closed set (no
# English-word collision). "st" maps to {saint} only — "street" is
# deliberately excluded: the measured class is honorific-titled and
# folding street here would add noise for ~zero recall (additive but
# precision-aware, the single-char-Roman lesson).
_HONOR_FOLD = {
"mount": "mt", "saint": "st", "doctor": "dr", "fort": "ft",
"general": "gen", "president": "pres", "captain": "capt",
"senator": "sen", "mister": "mr", "professor": "prof",
}
_HONOR_FOLD.update({v: k for k, v in _HONOR_FOLD.items()})
def _honorific_fold_variants(s: str) -> set[str]:
out: set[str] = set()
for tok in _TITLE_TOKEN_RE.findall(s):
v = _HONOR_FOLD.get(tok.lower())
if v and len(v) > 1 and v not in _TITLE_STOPWORDS:
out.add(v)
return out
# British<->American spelling fold. Measured 2026-05-18 (mined
# ground-truth, no fold): recall@1 50% / @8 70%, 12/40 misses —
# real headroom, no existing fold. Token-level (the British form
# IS the title token: "Labour", "Organisation", "Centre"). Same
# additive+symmetric discipline; strict closed set.
_BRIT_FOLD = {
"colour": "color", "honour": "honor", "behaviour": "behavior",
"organisation": "organization", "defence": "defense",
"centre": "center", "theatre": "theater", "catalogue": "catalog",
"programme": "program", "labour": "labor", "favour": "favor",
"licence": "license", "neighbour": "neighbor",
}
_BRIT_FOLD.update({v: k for k, v in _BRIT_FOLD.items()})
def _brit_fold_variants(s: str) -> set[str]:
out: set[str] = set()
for tok in _TITLE_TOKEN_RE.findall(s):
v = _BRIT_FOLD.get(tok.lower())
if v and len(v) > 1 and v not in _TITLE_STOPWORDS:
out.add(v)
return out
# Single source of truth for the active `_title_query_tokens` fold
# set. Bound into the run-DAG retrieval plan (RetrievalPlan
# .title_token_policy) so a replay knows which token-normalization
# produced the retrieved sources (Dav1d review 2026-05-19; #000001-
# family provenance, run-DAG not governance). Bump on any fold
# add/remove/semantics change.
_TITLE_TOKEN_POLICY = "tt-v2:hyphen+numeral+accent+honorific+brit"
def _title_query_tokens(s: str) -> set[str]:
base = {
t.lower()
for t in _TITLE_TOKEN_RE.findall(s)
if t.lower() not in _TITLE_STOPWORDS and len(t) > 1
}
# Hyphen-fold: additively include joined-no-hyphen variants for
# hyphenated runs in `s`. Symmetric — the function is called on
# both queries and titles, and additive fold preserves existing
# match patterns (e.g. `Coca-Cola history` query keeps {coca,
# cola, cocacola, history} so a `Coca-Cola` title still passes
# title-breadth via {coca, cola, cocacola}). See Ticket #000007.
base |= _hyphen_fold_variants(s)
# Numeral-fold: ordinal-word <-> Roman-numeral, same additive+
# symmetric discipline (measured 2026-05-18; see above).
base |= _numeral_fold_variants(s)
# Accent-fold: ASCII-folded variants of diacritic words. The
# token regex is [A-Za-z]+, so an accented title ("Béla Bartók")
# otherwise fragments into junk and never matches the ASCII form
# a user types. Additive+symmetric, same discipline; no-op when
# `s` is already ASCII (measured 2026-05-18; fold-search #1,
# 8.1% of titles). See `_accent_fold_variants`.
base |= _accent_fold_variants(s)
# Honorific-fold: Mt/St/Dr <-> Mount/Saint/Doctor, same additive+
# symmetric discipline (measured 2026-05-18; baseline recall@1 45%).
base |= _honorific_fold_variants(s)
# British<->American spelling, same discipline (baseline @1 50%).
base |= _brit_fold_variants(s)
return base
def _rerank_by_title(
hits: list,
question: str,
boost: float = 10.0,
*,
shards_dir=None,
) -> list:
"""Boost hits whose title overlaps query tokens. Pure ordering aid.
Uses the synonym-expanded token set so canonical-article titles
that share zero literal-query tokens but contain expansion tokens
(Wikipedia's `Graphics processing unit` for a `GPU` query) earn
the boost. Without this, the literal-query satellites (`GPU
cluster`) get +10 and the canonical article gets 0 — the exact
failure mode #000054 Phase 2 is closing. (Falls back to literal
qtokens when `shards_dir is None`.)
"""
qtokens = _title_query_tokens(question)
if not qtokens:
return hits
if shards_dir is not None:
qtokens = synonym_expand(qtokens, shards_dir=shards_dir)
for h in hits:
if not h.title:
continue
ttokens = _title_query_tokens(h.title.replace("_", " "))
overlap = len(qtokens & ttokens)
if overlap:
h.score += overlap * boost
hits.sort(key=lambda h: -h.score)
return hits
def _filter_by_title_relevance(
hits: list,
question: str,
*,
core_match_roots: set[str] | None = None,
body_density_check: callable | None = None,
phrase_match_roots: set[str] | None = None,
hyphen_fold_anchors: set[str] | None = None,
fallback_top_n: int = 5,
shards_dir=None,
) -> list:
"""Concept-aware relevance filter with five accept paths:
1. Title-token overlap (after synonym expansion). Strongest signal.
2. TF-IDF core keyword overlap — `core_match_roots` is a precomputed
set of source document_roots whose TF-IDF cores contain query
tokens. Closes the gap for neologisms like "permacomputer" that
never appear in titles but are distinctive enough to be TF-IDF
keywords of conversation bodies.
3. Body density — docs mentioning the query token >= N times pass
even without title or core match. Cheap proxy for "actually about
the topic." `body_density_check(hit)` returns bool.
4. Phrase-match — docs whose body contains a verbatim 4+ token
sequence from the question pass even when title and content
tokens don't overlap. Closes the allusion gap (2026-05-01
Orwell case): "has oceania always been at war with east asia"
has zero token overlap with the title "Nineteen Eighty-Four"
but the body contains the verbatim phrase "always been at
war" — without this accept path, the phrase-route hit gets
filtered out before it can rerank into the top-K. The
upstream phrase route already gates on 4-token-min sequences
(see _question_phrases) so false-positive risk is low.
5. Hyphen-fold anchor — when the question has hyphenated runs
(Ticket #000007), `hyphen_fold_anchors` is the joined-form
set ({"bipolar"} for "bi-polar is rare?"). Title-side stem
overlap with this anchor passes the filter even when the
breadth threshold fails. Rescues non-hyphen titles like
`Bipolar disorder` from rejection while leaving non-hyphen
queries (anchors empty) unaffected.
Rivalry exclusion (Intel-titled docs in AMD queries) still applies on
every accept path.
If all five accept paths together produce nothing, fall back to the
top `fallback_top_n` body-BM25 hits — the LLM gets enough context to
say "I don't know" rather than fabricating from a single tangential
source.
"""
qtokens = _title_query_tokens(question)
if not qtokens:
return hits
qtokens_stem = {_stem_token_for_match(t) for t in qtokens}
accept = synonym_expand(qtokens, shards_dir=shards_dir)
exclude = rivalry_excluded(
qtokens,
compare_phrasing=has_compare_phrasing(question),
shards_dir=shards_dir,
)
core_roots = core_match_roots or set()
phrase_roots = phrase_match_roots or set()
anchor_stems = (
{_stem_token_for_match(a) for a in hyphen_fold_anchors}
if hyphen_fold_anchors
else set()
)
# Title-overlap breadth threshold scales with query length, mirroring
# _body_density_passes: ≤2 tokens require ALL, 3+ require N-1. Without
# this, a 2-token query like "supermans girlfriend" admits docs that
# share only ONE token with the title (e.g. `Girlfriends` the TV show)
# — title-overlap fires first & body-density never gets to reject.
title_breadth = len(qtokens) if len(qtokens) <= 2 else len(qtokens) - 1
kept: list = []
for h in hits:
ttokens = _title_query_tokens(h.title.replace("_", " ")) if h.title else set()
ttokens_stem = {_stem_token_for_match(t) for t in ttokens}
if exclude & ttokens:
continue # rivalry: opposing-side title, drop it
# Direct stem-aware match against query tokens (each qtoken must
# be present, possessive/plural-tolerant). Strict signal.
direct_matches = len(qtokens_stem & ttokens_stem)
if direct_matches >= title_breadth:
kept.append(h)
continue
# Synonym fallback only for 1-token queries — otherwise a single
# synonym hit (e.g. "amd" matching an "intel"-titled doc via the
# Intel/AMD group) would over-recall.
if len(qtokens) == 1 and accept & ttokens:
kept.append(h)
continue
if h.document_root in core_roots:
kept.append(h)
continue
if h.document_root in phrase_roots:
kept.append(h)
continue
# Accept-path 5: hyphen-fold anchor (Ticket #000007). The
# joined-form variant from a hyphenated query token (e.g.
# "bipolar" from "bi-polar") matching the title's stem set
# is enough signal to pass — rescues `Bipolar disorder` from
# the breadth gate when the query was "bi-polar is rare?".
# Empty anchor set on non-hyphen queries — zero side effect.
if anchor_stems and (anchor_stems & ttokens_stem):
kept.append(h)
continue
if body_density_check is not None and body_density_check(h):
kept.append(h)
continue
if not kept:
return hits[: max(1, fallback_top_n)] if hits else []
return kept
DEFAULT_QUERY_POLICY = {
# Ticket #000007 — query-layer hyphen-fold marker. Folds into
# `governance_policy_hash` so records produced under the new
# rule (`Bipolar disorder` reachable from query "bi-polar")
# cache-split cleanly from pre-fold records. Code applies the
# fold unconditionally; this flag exists to make the policy
# transition observable from the cache_key alone.
"hyphen_fold_v1": True,
# Ticket #000006 amend 2026-05-02b (Rule 9) — premise-parroting /
# subject-tokens-absent demote. Threshold = number of question∩
# claim content tokens that must be absent from the union of
# cited evidence spans before STRICT → HYBRID. Default 3 keeps
# the signal unambiguous (single-token absence is often a
# stem-variant near-miss; three+ is the parroting fingerprint).
# Folds into verifier_policy_hash + governance_policy_hash.
"claim_lattice_subject_tokens_absent_threshold": 3,
"system_prompt": (
"You are answering a question using ONLY the sources provided below. "
"Each source is delimited by '=== Source: <URI> ===' headers.\n\n"
"GROUNDING RULE (most important):\n"
"For EVERY factual claim in your answer, include a verbatim quote "
"from a source enclosed in double quotes (\"...\"). The quoted span "
"must appear word-for-word in one of the sources. Make a claim "
"only when a verbatim quote in one of the sources supports it. "
"Quote the source word-for-word inside the double quotes.\n\n"
"ATTRIBUTION RULES:\n"
"1. A single source may discuss MULTIPLE products, companies, or "
"competitors. Read carefully and attribute facts only to the entity "
"the source explicitly names for that fact.\n"
"2. When the question asks about ONE specific entity (e.g., 'fastest "
"AMD CPU'), keep facts about that entity in the answer; leave facts "
"about competitors (Intel, Pentium) out, even when the source "
"discusses both — they are different products that happen to share "
"an article.\n"
"3. When referencing model numbers like 'Athlon XP 3200+', remember "
"that AMD's PR-rating numbers (3200+, 2500+) are model labels, not "
"the clock speed in MHz. Quote the source's own wording for clock "
"speeds; let the source state the speed.\n"
"4. When the sources contain the answer, write it. When the answer "
"is absent from the sources, say 'I don't know based on the "
"provided sources.' and stop there. Stay inside the sources at "
"all times — let the corpus speak."
),
# Restated rule fired as a user message immediately before the sources
# turn. Hermes (and most instruction-tuned 8B models) follow recent
# user-turn instructions more reliably than a system-turn rule that
# decays under long context. Repetition is not redundancy — it raises
# the prior on the response shape we want.
"grounding_reminder": (
"REMINDER: wrap every factual claim in double quotes (\"...\") "
"and the quoted span must appear word-for-word in one of the "
"sources. Each claim earns a verbatim quote. "
"Example:\n\n"
" Q: who founded Apple?\n"
" Sources: ...Apple Inc. was founded by Steve Jobs, Steve "
"Wozniak, and Ronald Wayne in 1976...\n"
" A: Apple's founders are named in the source: \"Apple Inc. was "
"founded by Steve Jobs, Steve Wozniak, and Ronald Wayne\".\n\n"
"Now answer the question on the next message."
),
"temperature": 0.1,
"top_p": 1.0,
"max_tokens": 768,
# Entity-path policy for the faithfulness verifier. See
# arborist/qa/verify.py:ENTITY_POLICIES. Default 'proximity' promotes
# to STRICT only if N verified entities cluster within W chars of
# each other in source — separating "source documents these entities
# as a group" (cast list / infobox / roster) from "source mentions
# them incidentally in scattered prose". Lives in policy so changing
# it bumps governance_policy_hash and invalidates cache cleanly.
"entity_policy": "proximity",
"entity_proximity_n": 3,
"entity_proximity_window": 300,
# Strip wikitext markup before the LLM ever sees the context. Lets
# Hermes quote prose verbatim and shrinks token bills (~43% on
# Wikipedia chunks). Bumps governance_policy_hash so prior cached
# answers under raw-wikitext policy stay distinct on lookup. No-op
# if mwparserfromhell isn't installed.
"base_version": _WIKITEXT_BASE_VERSION,
# Mechanical answer repair after first verify. Off by default so
# existing callers don't see answer text mutate. When on:
# synthetic_elision splits, trailing_artifact trims, and no_overlap
# claim drops are applied deterministically; the repaired answer is
# re-verified & persisted (cache_key inputs unchanged, only
# answer_text differs from what the LLM produced). One audit event
# `providence_repair` records the pre→post transition. Bumps
# governance_policy_hash so on/off agents share no cache silos.
"repair_enabled": False,
# Maximum re-prompt iterations after mechanical repair. 0 = no
# re-prompt (mechanical only). 1 = at most one extra LLM call
# asking the model to rewrite around the failed quotes.
"repair_max_reprompts": 0,
# G0 / CTI — claim-lattice-pointer answer mode. See
# arborist/qa/runner.py:DEFAULT_POLICY for full semantics. Default
# "quote" preserves existing behavior; "claim_lattice_pointer"
# instructs the runtime to build an evidence map, show short
# pointer ids (E1, E2, …) to the model, and accept pointer-line
# output ("Claim. [E12]") that the verifier maps back to
# content-addressed evidence_ids for the cache & run-DAG.
"answer_mode": DEFAULT_ANSWER_MODE,
"claim_lattice_system_prompt": CLAIM_LATTICE_SYSTEM_PROMPT,
"claim_lattice_grounding_reminder": CLAIM_LATTICE_GROUNDING_REMINDER,
# Reference-frame polarity preamble (Ticket #000002 / Module L).
# Injected as an additional user-role message before the
# grounding_reminder when `detect_frame` classifies the query as
# `reference` (allusion-shape query whose phrase route surfaced a
# fictional / reference-work source). The preamble nudges the
# model toward multi-frame answers — distinguish what the cited
# work depicts as actual continuity from what in-universe
# propaganda or characters claim. Pure prompt-side hint;
# verifier still runs the same hard checks. Empty string disables
# the augmentation. Folds into governance_policy_hash so two
# policies with different preamble text produce different
# cache_keys.
"claim_lattice_polarity_preamble": (
"This question may be a reference to {reference_title}. When "
"you answer, distinguish what the cited work depicts as "
"actual continuity from what in-universe propaganda or "
"characters claim within it. Cite evidence for each "
"substantive claim using the pointer format above."
),
"claim_lattice_allowed_source_roles": [
"primary_answer_source",
"secondary_context_source",
"background_source",
"unclassified",
# Self-reference: STRICT-trusted-as-fact unless falsified.
# See docs/self-reference-design.md.
"self_reference_source",
],
"claim_lattice_max_pointers_per_claim": 2,
# Cap on evidence blocks (chunks) per retrieved source. Default 2.
# Without this cap, a long Wikipedia article alone can split into
# ~20 chunks, each becoming a separate E* — the model then sees
# E1-E26 for what is really 5 sources and writes a mega-claim
# citing all of them. With the cap, 5 sources × 2 = 10 evidence
# blocks max. Chunks within each source are still relevance-ranked
# (distinct_query_tokens DESC, total_mentions DESC, chunk_idx ASC)
# so the cap keeps the most-relevant 2 chunks. Folds into
# governance_policy_hash; changing the cap invalidates prior
# cached records.
"claim_lattice_max_chunks_per_source": 2,
"claim_lattice_min_citation_coverage": 0.30,
"claim_lattice_min_claim_content_tokens": 2,
"claim_lattice_lazy_anchor_demote_threshold": 0.5,
"claim_lattice_lazy_anchor_demote_min_pairs": 3,
"claim_lattice_warrant_check_enabled": True,
# Deflection check: subject-anchor heuristic catches "model deflected
# to an adjacent grounded question" (e.g. "who burns the amazon
# river?" answered about rainforest deforestation, "river" never
# in answer). Promoted from sidecar to soft-demote 2026-05-02 —
# DEFLECTION_DETECTED downgrades EVIDENCE-WARRANTED → ANCHOR-
# WARRANTED via the existing soft-demote ladder path.
"claim_lattice_deflection_check_enabled": True,
"claim_lattice_format_collapse_check_enabled": True,
# Ticket #000008 Phase 2 — quantifier preflight guard. See
# arborist/qa/runner.py:DEFAULT_POLICY for the full rationale.
# Phase 2 lands with apply_caps=False (dry-run); cap is
# reported on result dict but not applied to the verifier.
"quantifier_guard_enabled": True,
"quantifier_guard_apply_caps": False,
# Mode allowlist when apply_caps flips True. See runner.DEFAULT_POLICY
# for the n=5 bench data driving the JSON-only default.
"quantifier_apply_caps_modes": ["claim_lattice"],
"quantifier_caps_by_intensity": {},
"quantifier_guard_modes": ["claim_lattice_pointer", "claim_lattice"],
# Phase 3 — default ON for lattice modes per the 2026-05-03 bench
# A/B (#000008 §12). See runner.DEFAULT_POLICY for full rationale.
"quantifier_reminder_enabled": True,
# Phase 4 — strict reject for broad-unbounded. See runner.py for
# rationale. Default OFF.
"quantifier_reject_broad": False,
# Ticket #000010 — see runner.DEFAULT_POLICY for rationale.
"metacognition_enabled": True,
"metacognition_temporal_check": True,
"metacognition_contradiction_check": True,
"metacognition_false_premise_check": True,
"metacognition_out_of_corpus_check": True,
"metacognition_block_on_contradiction": False,
# Ticket #000011 — soft preflight sidecar. See runner.DEFAULT_POLICY
# for full rationale. Default OFF.
"soft_preflight_enabled": False,
# Ticket #000001 §7 Phase 0 — deterministic cross-language guard.
# Default OFF (dry-run rollout discipline, same as the #000008
# quantifier guard / #000011 soft preflight / #000049 NLI): the
# feature is provably inert on English and strictly improves the
# cross-language case, but the default-flip is a separate
# bench-gated decision fox makes, not an autonomous one. Flip ON
# per-call to experiment (CLI `--crosslang-guard`, `make query
# XLANG=1`). Hash invariants (artifact — `governance_policy_hash`
# is sha256 of the *whole* policy, keys.py:182): NOT in
# `question_hash` (the user's question is preserved) and NOT in
# `verifier_policy_hash` (verifier byte-identical); like every
# policy flag it DOES change `governance_policy_hash`, which
# correctly partitions the cache by guard state (a guard-on answer
# must not be served to a guard-off lookup). The fail-closed path
# writes no cache row anyway.
"crosslang_guard_enabled": False,
# Ticket #000056 — Operation Sandwich. Requires
# crosslang_guard_enabled (it rides the Phase-0 signal). Default
# OFF, same rollout discipline. When ON + signal fires + the [mt]
# engine is available: query es→en (retrieval + LLM prompt),
# English answer through the UNTOUCHED verifier, en→es of the
# verified answer into display-only fields. NOT in question_hash
# (user's question preserved) nor verifier_policy_hash (verifier
# byte-identical); like any policy flag it does move
# governance_policy_hash (whole-policy hash → correct cache
# partition). MT *engine identity* binds into the run-DAG
# retrieval plan (RetrievalPlan.mt_*), not any policy hash.
"crosslang_translate_enabled": False,
# #000056 §9 — entity-preserving MT (mask proper nouns → translate
# → restore). Default **OFF**: the isolated win (Paul of Tarsus)
# did NOT replicate at bench scale — mask measured net-negative
# (es 71%→65%, fr −38pp; the lowercased bench gives it nothing to
# grab on the en side and the sentinels perturb opus-mt). The
# no-mask sandwich is the better config; mask stays available
# behind the flag for the corpus-title-anchor follow-up only.
# crosslang_source_lang names the bread (es default; fr/ru bench).
"crosslang_entity_mask": False,
"crosslang_source_lang": "es",
# Claim-count ceiling — see runner.DEFAULT_POLICY for rationale.
# Bench finding (york-england "tell me all there is to know")
# caught the runaway shape; cap of 12 admits entity-list
# questions while flagging encyclopedic spam. Folds into
# governance_policy_hash on change.
"claim_lattice_max_claims_per_answer": 12,
# JSON variant — `answer_mode="claim_lattice"`. Pairs with grammar-
# constrained inference (vLLM guided_json, Claude/GPT-4 native
# JSON, Qwen 3.6 reasoner). Lenient pre-parser keeps the path
# survivable on inference paths without grammar guidance. Toggling
# `claim_lattice_use_guided_json=False` disables the extra_body
# pass for endpoints that 400 on unknown fields.
"claim_lattice_json_system_prompt": CLAIM_LATTICE_JSON_SYSTEM_PROMPT,
"claim_lattice_json_grounding_reminder": CLAIM_LATTICE_JSON_GROUNDING_REMINDER,
"claim_lattice_use_guided_json": True,
"claim_lattice_json_stop_sequences": ["\n\n"],
# Per-mode context-budget defaults. Sprint 1b (2026-05-02) bench
# measured each answer mode's peak strict-rate bucket on a sweep
# over 8 KB → 1 MB context budgets. Previous flat default of
# 60 KB was past peak for both quote and pointer modes (which
# degrade after 32 KB) and only marginally above peak for the
# JSON variant (which keeps climbing into the 32-64 KB bucket).
# Defaults below are the mid of each mode's peak bucket rounded
# to nice numbers:
# quote → 16-32KB peak → 24000
# claim_lattice_pointer → 16-32KB peak → 24000
# claim_lattice (JSON) → 32-64KB peak → 48000
# Selected by `query()` when the caller does not pass an explicit
# `max_context_chars`. Any explicit caller value still wins
# (backward-compatible at the API boundary). The mapping itself
# lives in policy so changes fold into governance_policy_hash and
# partition the cache namespace cleanly. See
# docs/qa-modes-bench.md "recommended context budget".
"max_context_chars_by_mode": {
"quote": 24000,
"claim_lattice_pointer": 24000,
"claim_lattice": 48000,
},
# Verifier content-token rules version (#000053). See
# arborist/qa/runner.py:DEFAULT_POLICY for the rationale — keeps
# all-caps 2-3-char acronyms as content tokens; folds into
# verifier_policy_hash so prior cache records orphan on lookup.
"content_token_rules": "v2-acronym-aware",
}
@dataclass
class _Hit:
document_root: str
document_uri: str
title: str | None
score: float
shard_path: str
chunk_idx: int
source_role: str = "unclassified"
# Heuristic role classification + per-role budget multiplier. Lets the
# primary answer page (e.g. `Jurassic Park (film)` for a JP film query)
# claim a wider context slice than peripheral pages (`Jurassic Park (film
# score)`, `Jurassic Park video games`). The running `char_budget` check
# still bounds total context to `max_context_chars`; weights just shift
# how the budget gets divided.
SOURCE_ROLE_BUDGET_WEIGHTS = {
"primary_answer_source": 2.0,
"secondary_context_source": 1.0,
"noisy_background_source": 0.5,
"sequel_background_source": 0.5,
"background_source": 1.0,
"unclassified": 1.0,
# Self-promoted providence records (STRICT live, past kindergarten
# window). Same budget weight as background — Wikipedia stays the
# canonical primary; self-reference is supplementary anchoring.
# Trust model: STRICT-as-fact unless the verifier falsifies it.
"self_reference_source": 1.0,
}
# Title patterns that demote a source's role. Lower-cased substring match.
# 2026-04-30: extended to catch tie-in spinoff titles. The JP-dinosaurs
# query lazy-anchored on "Jurassic Park: Operation Genesis" (a video game)
# whose enumerative dinosaur tables pattern-matched the question shape
# more cleanly than the actual film article's prose. Adding the explicit
# game subtitle plus generic markers ("the game", "video games") so
# similar tie-ins classify as noisy and drop out of the evidence map.
_NOISY_TITLE_MARKERS = (
"score", "music", "soundtrack", "video game", "video games",
"merchandise", "discography", "operation genesis", "the game",
)
_SEQUEL_TITLE_MARKERS = (
" iii", " ii)", " ii ", " iv", " v ", " v)", "lost world", "sequel",
" 2)", " 3)", " 4)",
)
_SECONDARY_TITLE_MARKERS = (
"list of", "characters", "franchise", "history of", "people",
"timeline of",
)
def _classify_source_role(
title: str | None,
qtokens_stem: set[str],
*,
document_uri: str | None = None,
) -> str:
"""Tag a source by its likely role for an N-token query.
URI-scheme classification fires first: documents whose URI starts
with ``arborist://providence/`` are self-promoted providence
records (per ``arborist/sources/providence.py``) and classify as
``self_reference_source`` regardless of title shape — that role
captures the trust model "STRICT-as-fact unless verifier
falsifies."
Order matters for the title-based fallback: noisy/sequel/secondary
markers fire first because they catch peripheral pages whose
titles otherwise overlap query tokens fully (e.g. `Jurassic Park
(film score)` shares 3 stems with `{dinosaur, jurassic, park,
film}` but is not the primary answer source for a dinosaurs
question). Primary requires the strongest title coverage (N-1
of N stems present).
"""
if document_uri and document_uri.startswith("arborist://providence/"):
return "self_reference_source"
if not title:
return "unclassified"
t = title.lower()
if any(k in t for k in _NOISY_TITLE_MARKERS):
return "noisy_background_source"
if any(k in t for k in _SEQUEL_TITLE_MARKERS):
return "sequel_background_source"
if any(k in t for k in _SECONDARY_TITLE_MARKERS):
return "secondary_context_source"
title_tokens = _title_query_tokens(t.replace("_", " "))
title_stems = {_stem_token_for_match(tok) for tok in title_tokens}
if qtokens_stem and len(title_stems & qtokens_stem) >= max(1, len(qtokens_stem) - 1):
return "primary_answer_source"
return "background_source"
def _search_titles(conn, qtokens: list[str], limit: int) -> list[tuple]:
"""SQL LIKE over documents.title — finds the HTTP article that FTS5
misses because list-pages with many URLs have higher 'http' term
frequency than the actual protocol article. Returns rows shaped
to match the FTS5 hit tuple.
Bug fix 2026-05-02 (fox case: ``"what date did back to the
future come out?"``): the previous SQL had no ``ORDER BY``, so
SQLite returned rows in arbitrary internal order and ``LIMIT``
truncated before the actual title-token-overlap winners landed.
The film article ``"Back to the Future"`` (title contains
``back`` AND ``future``) lost the LIMIT race to substring-match
junk like ``"Out (poker)"`` (matches ``out`` substring),
``"Aberdeen, South Dakota"`` (``south`` contains ``out``),
``"Backplane"`` (``back`` substring), etc.
Fix: ``ORDER BY LENGTH(title) ASC`` (shorter titles win the
LIMIT race; title-purity proxy) and bump LIMIT to ``limit * 4``
so the post-filter (word-boundary stem-aware token-set
intersect) sees enough candidates to surface genuine matches
even on long-question OR'd LIKE queries.
The earlier 2026-05-02 v1 of this fix used a per-row
``CASE WHEN ... THEN 1 ELSE 0 END + ...`` token-hit score in
SQL, which broke on long questions (fox 2026-05-02:
18-content-token query → SQLite "Expression tree is too large
(maximum depth 1000)" — each ``CASE WHEN`` is multiple
expression-tree nodes; ``+``-chained N times exceeded the
bound). The simplified shape is universal: short queries see
a tight LIMIT, long queries see ``limit * 4`` candidates the
caller's post-filter ranks via stem-aware token intersection.
Title-length-asc keeps the LIMIT race biased toward
title-pure matches.
Also: cap the OR-chain at MAX_TITLE_LIKE_TOKENS to bound the
expression-tree depth. Beyond ~24 tokens the post-filter is
doing all the work anyway; extra LIKEs just inflate the
candidate set without adding signal.
"""
if not qtokens:
return []
over_fetch_limit = limit * 4
# Try FTS5 documents_fts first — O(K) hash lookup vs the un-indexable
# LOWER(title) LIKE '%tok%' that this function used through 2026-
# 05-02. The MATCH expression OR-joins the input tokens (quoted to
# neutralize FTS5 syntax). Empirical: ~0.05s/shard regardless of
# token count, vs. 10-15s for the LIKE form on the 870k-doc shard.
# Falls back to LIKE only on shards whose documents_fts isn't
# populated yet (e.g. legacy ingest before this index landed).
bounded = list(qtokens)[:24]
has_fts = conn.execute(
"SELECT 1 FROM documents_fts WHERE rowid = (SELECT MIN(rowid) FROM documents_fts) LIMIT 1"
).fetchone()
if has_fts:
# Quote each token & OR-join. FTS5's tokenizer applies the same
# porter stemming we use elsewhere, so 'permacomputer' / 'permac'
# match coherently.
match_expr = " OR ".join(f'"{t.lower().replace(chr(34), chr(34) * 2)}"' for t in bounded)
try:
# Rank by FTS5 bm25 ascending (lowest=best match) — titles
# that match MORE of the OR'd expanded tokens float to the
# top. This replaces the prior 2026-05-02 ``LENGTH(title)
# ASC`` tie-break, which only worked when qtokens were 1-3
# tokens: with the #000054 Phase 2b expansion (qtokens
# union synonym-pool) the candidate set explodes, length-
# asc returns "Unit" / "Unite" / "B unit" / … and the
# multi-token canonical title ("Graphics processing unit"
# = 3 expanded-token hits) gets cut by ``LIMIT``. bm25
# naturally favors multi-match titles because each OR
# clause that hits contributes to the score. The
# substring-junk problem the length-asc fix solved
# (2026-05-02 "Back to the Future" case) doesn't apply
# here: FTS5 MATCH is tokenized — "out" only matches the
# tokenized word "out", not the substring of "Aberdeen,
# South Dakota". Length-asc stays as the LIKE-fallback
# tie-break below (where the substring issue persists).
rows = conn.execute(
"SELECT d.document_root, d.document_uri, d.title "
"FROM documents_fts AS f "
"JOIN documents AS d ON d.rowid = f.rowid "
"WHERE documents_fts MATCH ? "
"ORDER BY bm25(documents_fts) ASC, LENGTH(d.title) ASC "
"LIMIT ?",
(match_expr, over_fetch_limit),
).fetchall()
return rows
except Exception:
# Malformed MATCH (rare; tokenizer-strange chars survived
# the quote escape). Fall through to LIKE form.
pass
# LIKE fallback — kept for shards lacking documents_fts data.
# Capped at 24 tokens so the OR-chain expression-tree stays under
# SQLite's depth-1000 limit on long queries.
clauses = " OR ".join(["LOWER(title) LIKE ?"] * len(bounded))
likes = [f"%{t.lower()}%" for t in bounded]
params = likes + [over_fetch_limit]
rows = conn.execute(
f"SELECT document_root, document_uri, title FROM documents "
f"WHERE {clauses} "
f"ORDER BY LENGTH(title) ASC LIMIT ?",
params,
).fetchall()
return rows
def _question_phrases(question: str, *, n: int = 4) -> list[str]:
"""Extract verbatim n-token sliding-window phrases from the question.
Used by the phrase-pattern retrieval route to catch allusions /
idioms / fictional-world references whose diagnostic signal is
the EXACT sequence including function words. Stopword stripping
would kill this:
"always been at war" — diagnostic Orwell signal
"always war" — generic, useless
So we DON'T strip stopwords here. Skip phrases whose tokens are
all ≤ 3 chars (pure boilerplate, no diagnostic value). Output
is lowercase, deduped, in source order. Default ``n=4`` is the
sweet spot empirically: 3-grams are too noisy ("the cat in"
matches loads of things), 5-grams miss shorter idioms ("winter
is coming" → 3 tokens).
"""
import re as _re
tokens = _re.findall(r"[A-Za-z][A-Za-z0-9]+", question)
if len(tokens) < n:
return []
out: list[str] = []
seen: set[str] = set()
for i in range(len(tokens) - n + 1):
window = tokens[i:i + n]
if max(len(t) for t in window) < 4:
continue # all-short-tokens → boilerplate
phrase = " ".join(t.lower() for t in window)
if phrase in seen:
continue
seen.add(phrase)
out.append(phrase)
return out
def _search_phrases(conn, phrases: list[str], limit: int) -> list[tuple]:
"""FTS5 phrase-match search across chunk bodies.
Each phrase becomes an FTS5 quoted-phrase token (``'"phrase"'``);
we OR the phrases so any verbatim match wins. Returns sqlite3.Row
objects with columns ``document_root, idx, document_uri, title``
matching the rest of the search-route surface.
Why this exists: AND-mode FTS5 token-matching (the default body
search route) treats query tokens independently — a doc must
contain every token but the tokens can be anywhere. For
allusion-shape queries the diagnostic signal is the verbatim
sequence:
Q = "has oceania always been at war with east asia"
body BM25 surfaces literal-geography articles (Oceania, Asia,
Far East) because they have the most occurrences of "Oceania"
+ "Asia" + "war" individually.
phrase MATCH '"always been at war"' surfaces
Nineteen_Eighty-Four because the phrase is verbatim Orwell.
The verbatim phrase route doesn't dominate: in ``_search_corpus``
its score is 70 (between core-keyword and title-LIKE ranks), and
the existing rerank pipeline still gates by title relevance.
Phrase matches just get a seat at the table.
Defensive: silently skip phrases containing double-quotes
(adversarial / malformed input). Wraps the FTS5 query in a
try/except so a malformed MATCH doesn't crash the search;
upstream callers see an empty result set.
"""
if not phrases:
return []
quoted = [f'"{p}"' for p in phrases if '"' not in p]
if not quoted:
return []
fts_query = " OR ".join(quoted)
try:
rows = conn.execute(
"""
SELECT
c.document_root,
c.idx,
d.document_uri,
d.title
FROM chunks_fts AS f
JOIN chunks AS c ON c.chunk_id = f.rowid
JOIN documents AS d ON d.document_root = c.document_root
WHERE chunks_fts MATCH ?
ORDER BY bm25(chunks_fts) ASC
LIMIT ?
""",
(fts_query, limit),
).fetchall()
except Exception: # noqa: BLE001 — search must fail soft
rows = []
return rows
def _docs_with_core_keyword_match(
conn, qtokens: list[str], limit: int
) -> list[tuple]:
"""Find SOURCE docs whose TF-IDF core keywords contain a query token.
TF-IDF cores act as enriched titles: a doc's distinctive low-frequency
terms get distilled into the core's content as comma-separated keywords.
A query for "permacomputer" (a neologism that never makes it into a
title) can match the TF-IDF core of a Grok conversation that mentioned
it, because permacomputer is a rare term that ranks high under TF-IDF.
Returns rows of (document_root, document_uri, title) for the SOURCE docs
(not the cores) — the source is what gets fed to the LLM as context.
"""
if not qtokens:
return []
# Word-boundary match against the comma-separated TF-IDF keyword list.
# Prepending/appending ", " lets one LIKE pattern (`%, token, %`) check
# for the token regardless of its position in the keyword string.
# Without this, naive `LIKE '%intel%'` would match "intelligence",
# "intellectual", "intellivision" — drowning real hits like Pentium_4
# (whose TF-IDF core has "intel" as an exact keyword) in noise.
#
# Per-row `match_count` tallies how many distinct query tokens hit
# this doc's TF-IDF core. Multi-token coverage is a strong relevance
# signal — a doc whose core has "intel" + "cpu" + "faster" beats a
# doc whose only signal is "intel" appearing in its TITLE. The
# caller boosts the score by match_count.
case_clauses = " + ".join(
[
"(CASE WHEN LOWER(', ' || c.content || ', ') LIKE ? THEN 1 ELSE 0 END)"
]
* len(qtokens)
)
where_clauses = " OR ".join(
["LOWER(', ' || c.content || ', ') LIKE ?"] * len(qtokens)
)
patterns = [f"%, {t.lower()}, %" for t in qtokens]
params = patterns + patterns + [limit]
rows = conn.execute(
f"""
SELECT
src.document_root,
src.document_uri,
src.title,
MAX({case_clauses}) AS match_count
FROM chunks c
JOIN documents core ON core.document_root = c.document_root
JOIN derivations der ON der.core_root = core.document_root
JOIN documents src ON src.document_root = der.src_root
WHERE core.source_type LIKE 'core:tfidf-%'
AND ({where_clauses})
GROUP BY src.document_root, src.document_uri, src.title
ORDER BY match_count DESC
LIMIT ?
""",
params,
).fetchall()
return rows
def _stem_token_for_match(t: str) -> str:
"""Light suffix-strip for query-token vs body matching.
Two normalizations:
possessive ``"superman's" -> "supermans" -> "superman"`` (the apostrophe
is already gone via _TITLE_TOKEN_RE; we drop the trailing
``s`` here so the lookup matches plain ``superman`` in body).
plural ``"powers" -> "power"``, ``"girlfriends" -> "girlfriend"``
so plural questions match singular source mentions.
Both are the same operation: strip trailing ``s`` for tokens > 4 chars.
Conservative on short tokens (``"is"``, ``"as"``, ``"us"`` would lose
meaning) and on tokens that don't end in ``s`` (no-op).
"""
if len(t) > 4 and t.endswith("s") and not t.endswith("ss"):
return t[:-1]
return t
def _body_count_with_stem(body: str, t: str) -> int:
"""Count mentions of ``t`` in ``body``, falling back to the lite-stemmed
form if the literal didn't match. Returns the LARGER of the two counts
so a query token that appears under both forms (rare) still scores."""
n_literal = body.count(t)
if n_literal:
return n_literal
stem = _stem_token_for_match(t)
if stem != t:
return body.count(stem)
return 0
def _chunk_query_relevance(
span: str, qtokens_stem: set[str]
) -> tuple[int, int]:
"""Rank one chunk by query-token overlap. Returns
``(distinct_present, total_mentions)``.
Stem-aware: same ``_body_count_with_stem`` we use elsewhere, so
``"supermans"`` matches ``"superman"`` in the chunk text. Soft
signal — the score never enters the proof path; it only orders
chunks within a source so the most-relevant chunk gets the lowest
pointer id (and the model's lazy-anchor habit lands on a useful
chunk by accident).
Sort callers should walk ``(-distinct, -total, chunk_idx_asc)``
to break ties stably toward document order.
"""
if not qtokens_stem:
return (0, 0)
body = span.lower()
counts = {t: _body_count_with_stem(body, t) for t in qtokens_stem}
distinct_present = sum(1 for n in counts.values() if n > 0)
total = sum(counts.values())
return (distinct_present, total)
def _body_density_passes(
conn, document_root: str, qtokens: set[str], min_mentions: int = 3
) -> bool:
"""Body-token CO-OCCURRENCE relevance.
Pre-2026-04-27 this counted any token's mentions in the body (so
Intel_8086, with 30 "Intel" mentions, would pass for a query of
{intel, fastest, cpu} despite never mentioning "fastest"). Pre-2026-
04-29 the threshold was "at least HALF" — too lenient for 2-token
queries: ``"supermans girlfriend"`` admitted ``Girlfriends`` (TV
show) which had ``girlfriend`` in body but no ``superman`` whatsoever.
Current rules — breadth scales with query length:
≤ 2 tokens require ALL of them present in body
3+ tokens require N - 1 (allow one weak signal token to miss)
Plus depth (``total_mentions >= min_mentions``) is still enforced.
Token matching is stem-tolerant (``_stem_token_for_match``): so a query
token ``"supermans"`` matches body ``"superman"`` and ``"girlfriends"``
matches body ``"girlfriend"``.
"""
if not qtokens:
return False
rows = conn.execute(
"SELECT content FROM chunks WHERE document_root = ? AND content IS NOT NULL",
(document_root,),
).fetchall()
if not rows:
return False
body = " ".join(unpack_chunk(r["content"]) or "" for r in rows).lower()
counts = {t: _body_count_with_stem(body, t.lower()) for t in qtokens}
distinct_present = sum(1 for n in counts.values() if n > 0)
total_mentions = sum(counts.values())
if len(qtokens) <= 2:
breadth_threshold = len(qtokens)
else:
breadth_threshold = len(qtokens) - 1
return distinct_present >= breadth_threshold and total_mentions >= min_mentions
def _search_one_shard(
p: Path,
*,
question: str,
over_fetch: int,
or_synonym_pool: list[str],
accept_tokens: set[str],
accept_stems: set[str],
progress: Progress,
) -> tuple[
list[tuple],
dict[str, str],
set[str],
set[str],
]:
"""Run all four search routes against a single shard.
Returns (raw_rows, root_to_shard, core_match_roots,
phrase_match_roots) — shard-local buffers that the caller merges.
Each shard worker opens its own connection so this is safe to
invoke from a thread pool.
"""
shard_name = p.name
raw: list[tuple] = []
root_to_shard: dict[str, str] = {}
core_match_roots: set[str] = set()
phrase_match_roots: set[str] = set()
sp_str = str(p.resolve())
conn = connect(p)
try:
backend = FTS5Backend(conn)
_route_hits = 0
for h in backend.search(
question, limit=over_fetch, extra_or_tokens=or_synonym_pool
):
raw.append(
(
h.score,
h.document_root,
h.document_uri,
h.title,
h.chunk_idx,
sp_str,
)
)
root_to_shard[h.document_root] = sp_str
_route_hits += 1
progress.emit(
f"search.shard.{shard_name}.fts5_body", hits=_route_hits
)
# Parallel title search using synonym-expanded tokens.
# The score starts deliberately low so this signal can't drown
# FTS5 BM25 + body relevance. _rerank_by_title later adds
# `overlap*10` to every hit (FTS5 and title-search alike) that
# has title-token overlap, so a doc whose title genuinely IS
# the topic ends up rewarded twice (once here, once in rerank).
# Title search now uses documents_fts FTS5 index (~0.05s/shard
# regardless of token count) — the prior >5-token bypass
# existed because LIKE '%tok%' was O(corpus × |tokens|).
# FTS5 MATCH makes this an O(K) hash lookup, so synonym-
# expanded title search is affordable at any query length.
title_search_rows = _search_titles(
conn, list(accept_tokens), over_fetch
)
_title_hits = 0
for r in title_search_rows:
title_norm = (r["title"] or "").replace("_", " ")
title_stems = {
_stem_token_for_match(t)
for t in _title_query_tokens(title_norm)
}
overlap = len(accept_stems & title_stems)
if overlap == 0:
continue
title_score = overlap * 10.0
raw.append(
(
title_score,
r["document_root"],
r["document_uri"],
r["title"],
0,
sp_str,
)
)
root_to_shard[r["document_root"]] = sp_str
_title_hits += 1
progress.emit(
f"search.shard.{shard_name}.title", hits=_title_hits
)
# Phrase-pattern search — verbatim multi-token sequences from
# the question. Catches allusions / idioms / fictional-world
# references whose diagnostic signal is the exact sequence
# including function words. Two passes for layered specificity:
# - n=6 (highest specificity): "oceania always been at war
# with" is essentially unique to Orwell. Score 100.
# - n=5 (high specificity): "oceania always been at war"
# still strongly Orwell-anchored. Score 90.
# 4-grams were tried (2026-05-01) and dropped: too noisy.
_phrase_hits_total = 0
for n in (6, 5):
phrase_score = 100.0 if n == 6 else 90.0
for r in _search_phrases(
conn, _question_phrases(question, n=n), over_fetch
):
raw.append(
(
phrase_score,
r["document_root"],
r["document_uri"],
r["title"],
r["idx"],
sp_str,
)
)
root_to_shard[r["document_root"]] = sp_str
phrase_match_roots.add(r["document_root"])
_phrase_hits_total += 1
progress.emit(
f"search.shard.{shard_name}.phrase", hits=_phrase_hits_total
)
# Core-keyword search: docs whose TF-IDF core keywords match.
# The query token doesn't need to be in title or even in body —
# being a TF-IDF keyword of the doc's core is enough signal.
_kw_hits = 0
for r in _docs_with_core_keyword_match(
conn, list(accept_tokens), over_fetch
):
core_match_roots.add(r["document_root"])
match_count = r["match_count"] or 1
kw_score = 40.0 + 25.0 * match_count
raw.append(
(
kw_score,
r["document_root"],
r["document_uri"],
r["title"],
0,
sp_str,
)
)
root_to_shard[r["document_root"]] = sp_str
_kw_hits += 1
progress.emit(
f"search.shard.{shard_name}.core_keyword", hits=_kw_hits
)
finally:
conn.close()
return raw, root_to_shard, core_match_roots, phrase_match_roots
def _search_corpus(
shards_dir: Path | None,
single_db: Path | None,
question: str,
over_fetch: int,
*,
progress: Progress | None = None,
) -> list[_Hit]:
"""Two parallel searches across shards, merged:
- FTS5 body search (BM25 over chunk content).
- SQL title-LIKE search (catches articles whose body is short on the
query terms but whose title is the literal topic — e.g., the HTTP
protocol article doesn't out-frequency-score URL-heavy list pages
but it IS the topic).
Dedupe by document_root. Title hits get a baseline score that
out-ranks FTS5 body hits so the actual topic article rises to the top.
"""
qtokens = _title_query_tokens(question)
# `accept_tokens` flows into the title-search FTS5 MATCH (line ~1035)
# and the core-keyword route (line ~1099); `accept_stems` is the
# accept-path-1 token-overlap filter in `_filter_by_title_relevance`.
# Pre-#000054 Phase 2b: this was `set(qtokens)` (qtokens only), with
# `or_synonym_pool` used only in the FTS5 body OR-fallback. That
# let the canonical article disappear: "what is a GPU?" expanded to
# {gpu, graphics, processing, unit} via synonym_expand, but only
# `gpu` was used to retrieve titles → the GPU-* satellites filled
# the budget while "Graphics processing unit" never entered the
# candidate pool. Now: use the expanded set in all four routes.
# The expanded set is bounded by `MAX_NEIGHBORS_PER_TOKEN` (8) ×
# |qtokens| capped at `MAX_TOTAL_TOKENS` (50), so the SQL clause
# count stays tractable; the old LIKE perf objection (line 1146
# pre-#000054) was retired with `documents_fts` MATCH replacing
# LIKE in `_search_titles` 2026-05-02.
or_synonym_pool = synonym_expand(qtokens, shards_dir=shards_dir)
# `accept_tokens` flows into title-search FTS5 MATCH + title-rerank
# + core-keyword search + title-relevance filter accept-paths. Use
# the **strict** synonym view (manual + acronym_parens evidence;
# excludes link_reciprocity) — link_reciprocity edges express
# topical adjacency, not synonymy, and injecting them into
# retrieval pulled "Curious George Brigade" into "why did the
# dinosaurs go extinct?" via `dinosaurs ↔ curious/george/brigade`
# reciprocal-link edges (2026-05-13 bench regression). The
# acronym-parens expansion that closes CPU→Central-processing-unit
# / GPU→Graphics-processing-unit is preserved because those edges
# ARE strict. `or_synonym_pool` (the broad view) still feeds the
# FTS5 OR-fallback for long-token surfacing per the prior design.
accept_tokens = synonym_expand_strict(qtokens, shards_dir=shards_dir) or set(qtokens)
paths: list[Path]
if shards_dir is not None:
paths = discover_shards(shards_dir)
elif single_db is not None:
paths = [Path(single_db)]
else:
paths = []
progress = progress or _progress_disabled()
raw: list[tuple] = []
# Roots whose TF-IDF cores contain a query token — collected across
# shards. Used downstream by _filter_by_title_relevance as accept-path 2.
core_match_roots: set[str] = set()
# Roots that matched a verbatim 4+ token phrase from the question.
# Used downstream by _filter_by_title_relevance as accept-path 4 so
# an allusion-shape hit (e.g. Nineteen_Eighty-Four for "always been
# at war") survives the title-token-overlap filter even when its
# title shares no tokens with the question.
phrase_match_roots: set[str] = set()
# Per-shard mapping of doc root -> shard path, for body-density lookups
# in accept-path 3. Lets us reach back to the source shard cheaply.
root_to_shard: dict[str, str] = {}
# Hoisted out of the per-shard loop — accept_stems is shard-invariant.
accept_stems = {_stem_token_for_match(t) for t in accept_tokens}
# Per-shard work is independent: each shard opens its own SQLite
# connection (no shared mutable state until merge) and runs four
# routes whose results we accumulate in shard-local buffers. Fan
# the shards out across a thread pool so the slowest shard caps
# the search wall instead of the sum-of-shards capping it. The
# downstream `raw.sort` + dedup step makes per-shard arrival
# order irrelevant. Worker count is capped via env override
# (`ARBORIST_SHARD_WORKERS`) for cgroup-bound runners; default
# is `min(8, len(paths))` which leaves headroom on commodity
# boxes while delivering ~4× on a 4-shard cluster.
if not paths:
max_workers = 1
else:
env_cap = os.environ.get("ARBORIST_SHARD_WORKERS", "").strip()
if env_cap.isdigit() and int(env_cap) > 0:
max_workers = min(int(env_cap), len(paths))
else:
max_workers = min(8, len(paths))
with ThreadPoolExecutor(max_workers=max_workers) as ex:
futures = [
ex.submit(
_search_one_shard,
p,
question=question,
over_fetch=over_fetch,
or_synonym_pool=or_synonym_pool,
accept_tokens=accept_tokens,
accept_stems=accept_stems,
progress=progress,
)
for p in paths
]
for fut in futures:
shard_raw, shard_r2s, shard_core, shard_phrase = fut.result()
raw.extend(shard_raw)
root_to_shard.update(shard_r2s)
core_match_roots.update(shard_core)
phrase_match_roots.update(shard_phrase)
raw.sort(key=lambda r: -r[0])
seen: set[str] = set()
out: list[_Hit] = []
for score, root, uri, title, idx, sp in raw:
if root in seen:
continue
seen.add(root)
out.append(
_Hit(
document_root=root,
document_uri=uri,
title=title,
score=score,
shard_path=sp,
chunk_idx=idx,
)
)
progress.emit(
"search.merge_dedupe", raw=len(raw), unique=len(out)
)
# Sidecar sets returned alongside the hit list. Pre-2026-05-01
# the function returned a bare list and the caller used
# `getattr(hits, "_core_match_roots", set())` to fish out the
# sets — but the sidecar was never attached, so the
# `core_match_roots` accept-path in _filter_by_title_relevance
# silently received an empty set. Returning a tuple corrects
# the plumbing AND threads the new `phrase_match_roots` for
# accept-path 4.
return out, core_match_roots, phrase_match_roots, root_to_shard
def _rerank(
hits: list[_Hit],
question: str,
*,
core_match_roots: set[str] | None = None,
body_density_check: callable | None = None,
phrase_match_roots: set[str] | None = None,
hyphen_fold_anchors: set[str] | None = None,
shards_dir=None,
progress: Progress | None = None,
) -> list[_Hit]:
"""Filter off-topic, then layer in body-coverage, title-overlap, and
source-role rank boosts.
Order matters: filter first (drops noise), body-coverage rerank next
(counters BM25's short-doc bias by rewarding topical density across
distinct query tokens), title-overlap rerank (breaks ties when a doc
IS the named topic), source-role rank-boost last so a primary
answer source outranks a list-page even when the list-page won on
BM25 + title-overlap (caught the "where is florida" defect:
``List_of_places_in_Florida`` and ``List_of_State_Roads_in_Florida``
each contain "Florida" hundreds of times in row markup, scoring
above the actual ``Florida`` article on body density).
"""
progress = progress or _progress_disabled()
hits = _filter_by_title_relevance(
hits,
question,
core_match_roots=core_match_roots,
body_density_check=body_density_check,
phrase_match_roots=phrase_match_roots,
hyphen_fold_anchors=hyphen_fold_anchors,
shards_dir=shards_dir,
)
progress.emit("search.title_filter", survivors=len(hits))
hits = _rerank_by_body_coverage(hits, question)
hits = _rerank_by_title(hits, question, shards_dir=shards_dir)
hits = _rerank_by_source_role(hits, question)
hits = _rerank_by_title_purity(hits, question, shards_dir=shards_dir)
return _rerank_by_ordered_token_match(hits, question)
# Per-role rank multiplier. Affects sort order, NOT just per-source
# context cap (the latter is SOURCE_ROLE_BUDGET_WEIGHTS, applied later).
# Defaults skew strongly toward primary so a real topic article beats
# list-pages and franchise/sequel siblings even when the list-page wins
# on body-density. Tuned against the JP-dinosaurs and "where is florida"
# defects.
SOURCE_ROLE_RANK_WEIGHTS = {
"primary_answer_source": 2.0,
"secondary_context_source": 0.7,
"background_source": 0.9,
"noisy_background_source": 0.3,
"sequel_background_source": 0.3,
"unclassified": 1.0,
# Self-reference: STRICT live providence records past kindergarten
# window. Treated like background — Wikipedia stays canonical
# primary; self-reference is supplementary anchoring trusted as
# fact unless the verifier falsifies the underlying record.
"self_reference_source": 0.9,
}
def _rerank_by_source_role(hits: list[_Hit], question: str) -> list[_Hit]:
"""Classify each hit by source role and rescale score by role weight.
Mutates ``h.source_role`` so the classification happens once and
downstream context-build code can reuse the value (instead of
re-classifying at cap time). Stable sort by score desc.
"""
qtokens_stem = {
_stem_token_for_match(t.lower())
for t in _title_query_tokens(question)
}
for h in hits:
h.source_role = _classify_source_role(
h.title, qtokens_stem, document_uri=h.document_uri
)
weight = SOURCE_ROLE_RANK_WEIGHTS.get(h.source_role, 1.0)
h.score = h.score * weight
hits.sort(key=lambda h: -h.score)
return hits
def _rerank_by_title_purity(hits: list[_Hit], question: str, *, shards_dir=None) -> list[_Hit]:
"""Boost titles by both purity AND multi-token-match breadth.
Two signals combine here:
- **Purity** = ``|title_tokens ∩ query_tokens| / |title_tokens|``.
Rewards titles that ARE the topic without off-topic suffix tokens.
``Jurassic Park (film)`` (purity 1.0) beats ``Jurassic Park
(NES game)`` (purity 0.5).
- **Overlap count** = ``|title_tokens ∩ query_tokens|``. Rewards
titles that match more of the query's content tokens. For a
query ``{dawson, creek}``: ``List of Dawson's Creek episodes``
(overlap 2) beats ``Clinton Creek, Yukon`` (overlap 1) even
when both have similar purity.
Multiplier: ``(1 + overlap_count) * (1 + purity)``:
overlap=2, purity=0.5 (e.g. ``Dawson's Creek episodes``) → 4.5×
overlap=1, purity=1.0 (bare-token-title match) → 4.0×
overlap=2, purity=0.4 (e.g. ``List of ... Dawson Creek``) → 4.2×
overlap=1, purity=0.5 (e.g. ``Dawson Leery``) → 3.0×
overlap=1, purity=0.33 (e.g. ``Clinton Creek, Yukon``) → 2.67×
overlap=0 → 1.0× (no change)
Pre-2026-05-01 the multiplier was ``1 + 2 * purity`` — purity
only, indifferent to overlap-count. That let ``Clinton Creek,
Yukon`` (purity 0.33 → 1.67× boost) outrank ``List of Dawson's
Creek episodes`` (purity 0.5 → 2.0×) on a query like "in
dawsons creek who is the girl across the creek?" once BM25's
short-title bias is folded in. The 2-token-match should beat
the 1-token-match cleanly.
Original use case (JP-dinosaurs lazy-anchor) still served:
``Jurassic Park (film)`` (overlap 2, purity 1.0) → 6.0× sits
well above ``Jurassic Park (NES game)`` (overlap 2, purity 0.5)
→ 4.5×, and far above ``Jurassic Park (franchise)`` (overlap 2,
purity 0.67) → 5.0×.
"""
qtokens = _title_query_tokens(question)
if not qtokens:
return hits
# #000054 Phase 2b: expand via the *strict* synonym view (manual +
# acronym_parens evidence) — NOT the broad view that includes
# link_reciprocity. The multiplier ``(1+overlap)*(1+purity)``
# amplifies; running it against the broad expansion set blows up
# on noisy link_reciprocity synonyms (observed 2026-05-13 bench:
# "why did the dinosaurs go extinct?" → Curious George Brigade
# titled docs got 4× via `dinosaurs ↔ curious/george/brigade`
# reciprocal-link edges, which express topical adjacency, not
# synonymy). The strict view keeps the CPU↔central-processing-
# unit / GPU↔graphics-processing-unit acronym surfacing — those
# are text-pattern edges (acronym_parens) where the phrase
# literally IS the expansion — without amplifying the
# reciprocal-link noise tail.
if shards_dir is not None:
qtokens = synonym_expand_strict(qtokens, shards_dir=shards_dir) or qtokens
# Stem-aware matching so possessive / plural variants match. The
# 2026-05-01 Dawson's Creek defect: question "dawsons creek" with
# title "List of Dawson's Creek episodes" — raw set intersection
# treated `dawsons` and `dawson` as distinct → overlap=1 (only
# `creek`) and the multi-token title bonus didn't fire. Stemming
# both sides via `_stem_token_for_match` (trailing-s strip on
# tokens >4 chars, skipping ss-enders) collapses both forms onto
# `dawson`, the overlap goes to 2, and the title beats single-
# token ``Clinton Creek, Yukon`` matches.
qstems = {_stem_token_for_match(t) for t in qtokens}
for h in hits:
if not h.title:
continue
ttokens = _title_query_tokens(h.title.replace("_", " "))
if not ttokens:
continue
tstems = {_stem_token_for_match(t) for t in ttokens}
overlap = tstems & qstems
if not overlap:
continue
purity = len(overlap) / len(tstems)
overlap_count = len(overlap)
h.score = h.score * (1.0 + overlap_count) * (1.0 + purity)
hits.sort(key=lambda h: -h.score)
return hits
def _ordered_match_length(query_tokens: list[str], title_tokens: list[str]) -> int:
"""Longest contiguous-or-subsequence match of query tokens (in
query order) inside title tokens (in title order).
Implementation: longest common subsequence over the two stem-
aware token lists. Returns the LCS length. The function is
O(N*M) where N = len(query_tokens), M = len(title_tokens). Both
are typically <10 in practice (titles short, query content
tokens few), so the cost is negligible.
Examples (query "red fish blue fish"):
title "Red Fish Blue Fish" → 4 (all four in order)
title "One Fish Two Fish Red Fish" → 4 (red-fish-blue? — no
'blue' in title, so 3
actually: red-fish-fish...
LCS counts longest common
SUBSEQUENCE not contiguous)
title "Red Dwarf" → 1 (red only)
title "Toronto Blue Jays" → 1 (blue only)
title "Blue Velvet (film)" → 1 (blue only)
"""
if not query_tokens or not title_tokens:
return 0
n = len(query_tokens)
m = len(title_tokens)
# 1D rolling dp
prev = [0] * (m + 1)
for i in range(1, n + 1):
cur = [0] * (m + 1)
for j in range(1, m + 1):
if query_tokens[i - 1] == title_tokens[j - 1]:
cur[j] = prev[j - 1] + 1
else:
cur[j] = max(cur[j - 1], prev[j])
prev = cur
return prev[m]
def _rerank_by_ordered_token_match(hits: list[_Hit], question: str) -> list[_Hit]:
"""Boost titles whose tokens appear in the same order as the query.
Multi-token queries like "red fish blue fish" should rank a
"Red Fish Blue Fish"-shaped title above a "Red Dwarf"-shaped
one, even when both pass the existing title-purity check. The
ordered-token-match length distinguishes them: 4 vs 1.
Multiplier: ``1 + 0.5 * (match_length - 1)`` for match_length ≥ 2.
Single-token matches get no boost (already covered by purity).
match_length 1 → 1.0× (no change; single-token rewarded by purity)
match_length 2 → 1.5×
match_length 3 → 2.0×
match_length 4 → 2.5×
Stem-aware (uses ``_stem_token_for_match``) so possessive /
plural / "dawsons" vs "dawson" variants collapse onto the same
stem before LCS.
Caught the 2026-05-01 "plot of red fish blue fish?" defect:
pre-fix, body BM25 + title purity tied across multiple
color/animal-titled docs; with the ordered-match boost, the
Dr. Seuss book's title (purity 1.0, ordered-match 4) sits
cleanly above Red Dwarf, Toronto Blue Jays, etc. (purity 0.5,
ordered-match 1).
"""
qtokens = _title_query_tokens(question)
if len(qtokens) < 2:
# No order to match on a single-token query.
return hits
# Preserve query token ORDER (not the set ordering from
# _title_query_tokens which de-dupes via set comprehension).
# Walk the question text, applying the same regex + filter, so
# the resulting list reflects natural reading order.
qtokens_ordered: list[str] = []
seen: set[str] = set()
for tok in _TITLE_TOKEN_RE.findall(question):
t = tok.lower()
if (
t in _TITLE_STOPWORDS
or len(t) <= 1
or t in seen
):
continue
seen.add(t)
qtokens_ordered.append(_stem_token_for_match(t))
if len(qtokens_ordered) < 2:
return hits
for h in hits:
if not h.title:
continue
# Title tokens in TITLE order, deduped on first occurrence.
ttokens_ordered: list[str] = []
title_seen: set[str] = set()
for tok in _TITLE_TOKEN_RE.findall(h.title.replace("_", " ")):
t = tok.lower()
if (
t in _TITLE_STOPWORDS
or len(t) <= 1
or t in title_seen
):
continue
title_seen.add(t)
ttokens_ordered.append(_stem_token_for_match(t))
if not ttokens_ordered:
continue
match_len = _ordered_match_length(qtokens_ordered, ttokens_ordered)
if match_len >= 2:
h.score = h.score * (1.0 + 0.5 * (match_len - 1))
hits.sort(key=lambda h: -h.score)
return hits
def _rerank_by_body_coverage(
hits: list,
question: str,
*,
weight: float = 0.6,
) -> list:
"""Boost score by per-token body coverage to counteract BM25's short-doc bias.
BM25 normalizes by document length, but its `b` parameter under-penalizes
*very* short docs that happen to mention every query token. Result: a
1-paragraph stub on Intel 4040 (1974 microcontroller) outscored a 30-page
Intel Core i7 article for "what is the fastest intel CPU?".
Approach: for each hit's full body, sum sqrt(count) per query token.
Sqrt scaling lets long topical articles meaningfully out-score short
tangential ones without runaway domination by enumerative list pages.
stub article: 5 intel + 1 fastest + 4 cpu -> sqrt(5)+sqrt(1)+sqrt(4) ~ 5.2
long topical: 200 + 10 + 80 -> sqrt(200)+sqrt(10)+sqrt(80) ~ 26.2
enumeration: 1000 + 50 + 300 -> sqrt(1000)+sqrt(50)+sqrt(300) ~ 55.8
Multiply by `weight` (default 0.6) and add to the existing score. The
differentiation in the example above is decisive but bounded: long topical
articles get +15-16, enumerations get +33, stubs get +3. Combined with
FTS5 base scores in the 40s, the long topical article wins comfortably.
Cost: one body fetch per surviving candidate (typically 24-32). Bounded.
"""
import math
qtokens_lower = {t.lower() for t in _title_query_tokens(question)}
if not qtokens_lower:
return hits
for h in hits:
body = _load_doc_text(h.shard_path, h.document_root)
if body is None:
continue
body_lower = body.lower()
coverage = sum(
math.sqrt(body_lower.count(t)) for t in qtokens_lower
)
h.score += coverage * weight
hits.sort(key=lambda h: -h.score)
return hits
def _load_doc_text(shard_path: str, document_root: str) -> str | None:
"""Concatenate all hot chunks of a document. Returns None if cold or missing."""
conn = connect(shard_path)
try:
rows = conn.execute(
"SELECT content FROM chunks "
"WHERE document_root = ? AND content IS NOT NULL "
"ORDER BY idx ASC",
(document_root,),
).fetchall()
finally:
conn.close()
if not rows:
return None
return "\n\n".join(unpack_chunk(r["content"]) or "" for r in rows)
def _load_doc_chunks(
shard_path: str, document_root: str
) -> list[tuple[int, str, str]] | None:
"""Per-chunk hot rows of a document.
Returns ``[(chunk_idx, leaf_hash, span), ...]`` in chunk order, or
``None`` if the document is cold or absent. Cold individual chunks
are skipped (the WHERE clause filters NULL content). Used by the
claim-lattice-pointer evidence-map builder to emit one
``EvidenceObject`` per chunk so the model can cite the specific
paragraph that supports a claim instead of lazy-anchoring the
whole article.
"""
conn = connect(shard_path)
try:
rows = conn.execute(
"SELECT idx, leaf_hash, content FROM chunks "
"WHERE document_root = ? AND content IS NOT NULL "
"ORDER BY idx ASC",
(document_root,),
).fetchall()
finally:
conn.close()
if not rows:
return None
out: list[tuple[int, str, str]] = []
for r in rows:
span = unpack_chunk(r["content"]) or ""
if not span:
continue
out.append((r["idx"], r["leaf_hash"], span))
return out or None
def _extract_preflight_hash_from_blob(blob: str | None) -> str | None:
"""Pull the ``preflight`` stage hash out of a persisted
``run_dag_blob`` (Ticket #000009 §7.2). Returns ``None`` when
the blob is absent / unparseable / lacks a preflight stage —
legacy rows written before #000009 fall through this path
cleanly without raising.
"""
if not blob:
return None
try:
parsed = json.loads(blob) if isinstance(blob, str) else blob
except (json.JSONDecodeError, TypeError):
return None
if not isinstance(parsed, dict):
return None
for node in parsed.get("nodes") or []:
if isinstance(node, dict) and node.get("stage") == "preflight":
return node.get("hash")
return None
def _context_root(source_roots: list[str]) -> str:
"""Merkle root over sorted source document_roots — the v9.8 'source' dim
for multi-source answers. Sorting makes the root deterministic regardless
of search ranking order."""
if not source_roots:
return "00" * 32
sorted_roots = sorted(source_roots)
if len(sorted_roots) == 1:
return sorted_roots[0]
leaves = [bytes.fromhex(r) for r in sorted_roots]
return MerkleTree.build(leaves).root.hex()
[docs]
def query(
*,
question: str,
qa_db: Path,
chat_client: ChatClient,
model_id: str,
revision: str = "",
quantization: str = "",
shards_dir: Path | None = None,
single_db: Path | None = None,
top_k: int = 8,
over_fetch: int = 32,
max_context_chars: int | None = None,
policy: dict | None = None,
chain: str = "private",
fidelity: str | None = None,
burn_existing: bool = False,
retrieval_keywords: str | None = None,
translator: object | None = None,
progress: Progress | None = None,
extra_body: dict | None = None,
) -> dict:
"""Answer `question` using the corpus. Cache to qa_db. Returns a result dict.
`fidelity` controls lookup tolerance — see ``FIDELITY_MODES`` in
``arborist.qa.keys``. ``"strict"`` checks only the cache_key
matching this call's ``policy["question_dedup"]``. ``"equivalence_class"``
(default) tries the primary cache_key first, then the alternate
dedup-mode cache_key as a fallback so a fast-cache agent can reuse
a record written under either mode. Result includes ``lookup_path``
naming which key matched (or ``"miss"`` when the LLM ran).
`burn_existing=True` deletes the matching live providence_cache row
(under the primary dedup-mode cache_key) BEFORE the cache lookup,
forcing a fresh inference. Each burn writes a ``providence_burn``
audit event. Test-ergonomic: run `make query Q=... BURN=1` after
tweaking a knob to see the new behavior without finding cache_keys
by hand. Result includes ``burned_existing`` reporting how many
rows were deleted (0 or 1 for the primary key; the equivalence-
class fallback key is left alone so prior alt-mode records stay
historic).
`retrieval_keywords` augments the FTS5 search and title-filter
token set with operator-supplied keywords WITHOUT changing what
the LLM sees as its question or what the verifier checks.
Empirically observed 2026-05-01: long discursive questions like
'what technology is currently or soon available which may enable
one person to reconstruct another person's thoughts...' under-
retrieve because their content tokens get diluted by template
phrasing. Appending domain keywords ('transcranial knowledge
acquisition') narrows OR-mode FTS5 to the topical article
(Neurotechnology) and lifts the verdict from HYBRID to STRICT.
Keywords do NOT enter ``question_hash`` directly, but they DO
change which sources get chosen — and that re-routes the
``context_root`` and ``conversation_hash`` components of
``cache_key``. Two calls with the same question and different
keywords therefore land under different cache_keys (different
contexts, different cached records — correctly so). Pair with
``burn_existing=True`` to force fresh inference when iterating
on keyword sets.
"""
policy = policy or DEFAULT_QUERY_POLICY
progress = progress or _progress_disabled()
if fidelity is None:
fidelity = policy.get("fidelity", DEFAULT_FIDELITY)
if fidelity not in FIDELITY_MODES:
raise ValueError(
f"fidelity must be one of {FIDELITY_MODES}, got {fidelity!r}"
)
answer_mode = policy.get("answer_mode", DEFAULT_ANSWER_MODE)
if answer_mode not in ANSWER_MODES:
raise ValueError(
f"policy['answer_mode'] must be one of {ANSWER_MODES}, got {answer_mode!r}"
)
# Resolve per-mode context budget when the caller didn't pass one
# explicitly. Sprint 1b (2026-05-02) — different answer modes peak
# at different budgets; quote/pointer at 24 KB, JSON at 48 KB.
# Explicit caller value always wins (backward-compatible).
if max_context_chars is None:
by_mode = policy.get("max_context_chars_by_mode") or {}
max_context_chars = int(
by_mode.get(answer_mode, policy.get("max_context_chars", 60000))
)
# Ticket #000001 §7 Phase 0 — deterministic cross-language guard.
# Retrieval-side only; never touches verifier / audit_mode /
# cache_key / governance_policy_hash / question_hash. English path
# is byte-identical: `guard()` returns None for any pure-ASCII
# query without inverted punctuation, so `_xlang` stays None and
# nothing below changes. When the non-English signal fires and no
# corpus-language content token survives the es-v1 function-word
# filter, fail closed to UNGROUNDED before preflight/retrieval/LLM
# (mirrors the `quantifier_should_reject` reject-DAG path so the
# rejection stays Merkle-auditable). The non-fail-closed case is
# handled at the `retrieval_query` construction below.
# Single gate for BOTH §7 seams (fail-closed below + retrieval_query
# strip later): flag OFF → `_xlang` is None → both blocks skip →
# behaviour reverts byte-for-byte to pre-#000001-§7, giving a clean
# A/B baseline for experimentation.
from arborist.qa.crosslang import guard as _xlang_guard
_xlang = (
_xlang_guard(question)
if policy.get("crosslang_guard_enabled", False)
else None
)
if _xlang is not None and _xlang.fail_closed:
from arborist.qa.dag import (
build_reject_run_dag as _xl_build_dag,
_canonical_json as _xl_canon,
_sha256_hex as _xl_sha,
)
_xl_vmethod = (
"claim_lattice_pointer"
if answer_mode == "claim_lattice_pointer"
else "claim_lattice"
if answer_mode == "claim_lattice"
else "quote"
)
_xl_qhash = question_hash(
question, mode=policy.get("question_dedup", "equivalence_class"),
)
_xl_violations = [{
"kind": "CROSS_LANGUAGE_UNSUPPORTED",
"signal": "non_english_punctuation_or_script",
"stoppack": "es-v1",
"dropped_tokens": list(_xlang.dropped),
"content_tokens": list(_xlang.content_tokens),
"reason": _xlang.reason,
}]
_xl_answer_text = (
"CROSS-LANGUAGE PREFLIGHT — UNGROUNDED\n\n"
"This query is not in the corpus language and no cross-"
"language bridge is enabled (ticket #000001 §7). Ask in "
"English, or wait for the MT retrieval route (Phase 1, "
"gated on the provider decision)."
)
_xl_preflight_payload = {
"guard": "crosslang-v1",
"signal": True,
"stoppack": "es-v1",
"dropped_tokens": list(_xlang.dropped),
"content_tokens": list(_xlang.content_tokens),
}
_xl_preflight_hash = _xl_sha(_xl_canon(_xl_preflight_payload))
_xl_run_dag = _xl_build_dag(
question_hash=_xl_qhash,
preflight_hash=_xl_preflight_hash,
preflight_payload=_xl_preflight_payload,
rejection_reason=_xlang.reason,
answer_text=_xl_answer_text,
audit_mode="UNGROUNDED",
verifier_method=_xl_vmethod,
violations=_xl_violations,
)
return {
"status": "cross_language_unsupported",
"audit_mode": "UNGROUNDED",
"cache_key": None,
"lookup_path": "preflight",
"run_dag_root": _xl_run_dag["root"],
"run_dag_blob": json.dumps(_xl_run_dag, separators=(",", ":")),
"preflight_hash": _xl_preflight_hash,
"answer_text": _xl_answer_text,
"sources": [],
"n_quotes": 0,
"n_verified": 0,
"verifier_method": _xl_vmethod,
"unverified_quotes": [],
"partially_verified_quotes": [],
"violations": _xl_violations,
}
# Ticket #000056 — Operation Sandwich. Edge IN: when the Phase-0
# signal fired (non-English) and there IS groundable content (not
# fail-closed) and translation is opted in, translate the query
# es→en so the English corpus ranks primary AND the LLM is prompted
# in English (so it answers in English → the UNTOUCHED verifier
# grounds English-vs-English). `question` (Spanish) is left
# untouched: it remains the user's question for `question_hash` /
# cache identity. The translation is a retrieval/prompt transform
# whose engine identity binds into the run-DAG retrieval plan
# (RetrievalPlan.mt_*), exactly the `--retrieval-keywords` status.
# `_mt` graceful-degrades (no `[mt]` extra → available False →
# `_sandwich_en_q` stays None → behaviour falls back to Phase-0).
_sandwich_en_q: str | None = None
_mt = None
_src_lang = "es"
if (
_xlang is not None
and not _xlang.fail_closed
and policy.get("crosslang_translate_enabled", False)
):
from arborist.qa.mt import get_translator
_mt = translator or get_translator()
if policy.get("crosslang_entity_mask", False): # #000056 §9: net-negative, default off
from arborist.qa.mt.entity_mask import MaskedTranslator
_mt = MaskedTranslator(_mt)
_src_lang = policy.get("crosslang_source_lang", "es")
_cand = _mt.translate(question, _src_lang, "en")
if getattr(_mt, "available", False) and _cand and _cand != question:
_sandwich_en_q = _cand
# The text the LLM is prompted with (English when the sandwich is
# active; the user's original question otherwise). NEVER feeds
# question_hash.
llm_question = _sandwich_en_q or question
# Quantifier preflight (Ticket #000008 Phase 1+2). Phase 1 runs
# the lexical classifier; Phase 2 looks up the per-model cap.
# The cap is REPORTED on the result dict (claim_cap_applied) but
# only applied to the verifier when quantifier_guard_apply_caps
# is True (default False through dry-run rollout per §10.11.3).
# Six-level disable hierarchy gates each step: master
# (quantifier_guard_enabled), per-mode (quantifier_guard_modes),
# per-call (quantifier_caps_by_intensity overrides), per-test
# (policy={"quantifier_guard_enabled": False}).
from arborist.qa.model_profiles import cap_for_intensity
from arborist.qa.quantifier import classify_question_quantifier
quantifier_guard_on = bool(policy.get("quantifier_guard_enabled", True))
quantifier_guard_modes = policy.get(
"quantifier_guard_modes",
["claim_lattice_pointer", "claim_lattice"],
)
quantifier_mode_gated = answer_mode in (quantifier_guard_modes or [])
if quantifier_guard_on:
quantifier = classify_question_quantifier(question)
else:
# Master kill — emit a stub so the result schema stays
# consistent. Bench rows can still distinguish "guard off"
# from "no classification" via quantifier_intensity=None.
quantifier = {
"intensity": None,
"matched_token": None,
"explicit_count": None,
"is_broad": False,
"operational_shape": None,
"scope_bound_hint": "unknown",
"classifier_version": None,
}
if quantifier_guard_on and quantifier_mode_gated and quantifier["intensity"]:
claim_cap_lookup = cap_for_intensity(
model_profile_id=model_id,
intensity=quantifier["intensity"],
explicit_count=quantifier["explicit_count"],
policy_overrides=policy.get("quantifier_caps_by_intensity") or None,
)
else:
claim_cap_lookup = None
# Effective cap that the verifier will see. Dry-run mode
# (apply_caps=False) preserves the policy default; once an
# operator flips apply_caps=True, the looked-up cap shadows the
# default for this call only — but ONLY for modes in
# quantifier_apply_caps_modes. n=5 bench (#000008 §12.10) found
# cap-on-pointer fires TOO_MANY_CLAIMS 20× without moving the
# 0/45 STRICT floor, while cap-on-JSON wins +14pp. Default
# allowlist is ["claim_lattice"] (JSON only); empty/None falls
# back to all guard_modes.
# The full cap fallback chain reads:
# 1. quantifier-guard cap (when apply_caps=True AND mode allowed)
# 2. claim_lattice_max_claims_per_answer policy field
# 3. hard-coded default 12
quantifier_apply_caps = bool(policy.get("quantifier_guard_apply_caps", False))
quantifier_apply_caps_modes = policy.get(
"quantifier_apply_caps_modes",
quantifier_guard_modes, # legacy fallback
) or quantifier_guard_modes
quantifier_caps_mode_gated = answer_mode in (quantifier_apply_caps_modes or [])
_policy_max_claims = int(policy.get("claim_lattice_max_claims_per_answer", 12))
if (
quantifier_apply_caps
and quantifier_caps_mode_gated
and claim_cap_lookup is not None
):
effective_max_claims = int(claim_cap_lookup)
else:
effective_max_claims = _policy_max_claims
# Ticket #000010 — meta-cognition preflight. Pure deterministic
# classifier wraps the quantifier output plus four new detectors
# (temporal, contradiction, false-premise-lite, out-of-corpus).
# Surfaces a QuestionState on the result dict; first pass does
# NOT bind into run_dag_root (deferred to Phase 5 / ticket #000009
# where the quantifier_preflight node lands too — both nodes can
# land together to keep the run-DAG schema atomic). Reference
# frames not yet plumbed through (frame_detection runs after
# retrieval, and preflight here is pre-retrieval — frame info
# lives on the result dict separately, not on QuestionState
# in this pass).
from arborist.qa.metacognition import preflight_question
progress.emit("preflight.start", mode=answer_mode)
_t_preflight = time.monotonic()
question_state = preflight_question(
question,
model_profile_id=model_id,
reference_frames=(),
policy=policy,
)
preflight_ms = _ms_since(_t_preflight)
progress.emit(
"preflight.done",
is_broad=bool(quantifier.get("is_broad")),
intensity=quantifier.get("intensity"),
ms=int(preflight_ms),
)
# Ticket #000011 — optional soft preflight sidecar. Default OFF;
# one short LLM round-trip when policy["soft_preflight_enabled"]
# is True. Returns a stub hint (SOFT_DISABLED) when off so the
# result-dict / run-DAG schema stays consistent. NEVER enters
# the verifier proof path; advisory only.
from arborist.qa.soft_preflight import soft_preflight_question
_t_soft_preflight = time.monotonic()
soft_hint = soft_preflight_question(
question,
chat_client=chat_client,
model_id=model_id,
policy=policy,
)
soft_preflight_ms = _ms_since(_t_soft_preflight)
# Ticket #000008 Phase 4 — strict reject for broad-unbounded.
# When opt-in via policy / --reject-broad CLI flag, return
# UNGROUNDED before the LLM call for ALL/COMPREHENSIVE/
# OPEN_REQUEST + scope_bound_hint==unbounded shapes. Bounded
# universals (scope_bound_hint==bounded) are NOT rejected per
# §10.1 — those are answerable. Saves the ~10-15s LLM call on
# rejected runs.
quantifier_reject_broad = bool(policy.get("quantifier_reject_broad", False))
quantifier_should_reject = (
quantifier_guard_on
and quantifier_mode_gated
and quantifier_reject_broad
and quantifier.get("is_broad")
and quantifier.get("scope_bound_hint") == "unbounded"
)
if quantifier_should_reject:
# Early-return without an LLM call. Skips retrieval cost too —
# we already know the answer set is undefined. Result schema
# mirrors a normal UNGROUNDED row so bench/CLI rendering
# stays consistent.
# Ticket #000009 §8.2 / feedback §6.2: build a 3-stage
# reject-broad DAG so the rejection is Merkle-auditable.
# Without this, two rejections under different policy state
# would be indistinguishable in audit replay.
from arborist.qa.dag import (
build_reject_run_dag,
preflight_node_hash as _pre_hash,
)
# question_hash / verifier_policy_hash / model_profile_hash
# already imported at module top; do NOT re-import locally
# (would shadow free-variable uses elsewhere in this function).
_reject_qhash = question_hash(
question,
mode=policy.get("question_dedup", "equivalence_class"),
)
_reject_ghash = verifier_policy_hash(policy)
_reject_mhash = model_profile_hash(model_id, revision, quantization)
_reject_rationale = (
"preflight rejection — broad-quantifier query with "
"unbounded scope. Operator opted in via "
"quantifier_reject_broad policy."
)
_reject_violations = [{
"kind": "BROAD_QUANTIFIER_REJECTED",
"intensity": quantifier["intensity"],
"matched_token": quantifier["matched_token"],
"scope_bound_hint": quantifier["scope_bound_hint"],
"reason": _reject_rationale,
}]
_reject_answer_text = (
"BROAD-QUANTIFIER PREFLIGHT REJECTED · scope unbounded\n\n"
f"Question matched {quantifier['intensity']} intensity "
f"(\"{quantifier['matched_token']}\") with an under-"
"specified universe. Narrow the question (e.g. add a "
"year, league, country, or category) or run with "
"--allow-broad for exploratory enumeration."
)
# Same payload-then-hash pattern as the miss path so
# `--show-preflight` can render the full clause set on
# reject rows too.
from arborist.qa.dag import (
_canonical_json as _reject_canon,
_sha256_hex as _reject_sha,
build_preflight_node_payload as _reject_build_payload,
)
_reject_preflight_payload = _reject_build_payload(
question_state=question_state.to_dict(),
quantifier=quantifier,
answer_contract={
"guard_enabled": quantifier_guard_on,
"mode_gated": quantifier_mode_gated,
"apply_caps_active": quantifier_apply_caps,
"apply_caps_mode_gated": quantifier_caps_mode_gated,
"claim_cap_resolved": claim_cap_lookup,
"claim_cap_applied": None, # cap never reaches verifier on reject
"manual_quotes_allowed": False,
"evidence_pointer_required": True,
"allow_unbounded_enumeration": False,
"reject_broad_active": True,
"metacognition_enabled": bool(
policy.get("metacognition_enabled", True)
),
"block_on_contradiction": bool(
policy.get("metacognition_block_on_contradiction", False)
),
},
prompt_contract={
# Rejection skips the LLM, so no reminder ever fires.
"reminder_enabled": bool(
policy.get("quantifier_reminder_enabled", False)
),
"reminder_injected": False,
"reminder_template_id": None,
},
evidence_contract={
"max_evidence_ids_exposed": int(policy.get(
"claim_lattice_max_pointers_per_claim", 2
)),
"one_claim_per_line": True,
},
policy_refs={
"governance_policy_hash": _reject_ghash,
"model_profile_hash": _reject_mhash,
"answer_mode": answer_mode,
},
)
_reject_preflight_hash = _reject_sha(
_reject_canon(_reject_preflight_payload)
)
_reject_run_dag = build_reject_run_dag(
question_hash=_reject_qhash,
preflight_hash=_reject_preflight_hash,
preflight_payload=_reject_preflight_payload,
rejection_reason=_reject_rationale,
answer_text=_reject_answer_text,
audit_mode="UNGROUNDED",
verifier_method=(
"claim_lattice_pointer"
if answer_mode == "claim_lattice_pointer"
else "claim_lattice"
if answer_mode == "claim_lattice"
else "quote"
),
violations=_reject_violations,
)
return {
"status": "broad_quantifier_rejected",
"audit_mode": "UNGROUNDED",
"cache_key": None,
"lookup_path": "preflight",
# Audit binding: reject path now carries its own
# 3-stage run_dag (question → preflight → final_label)
# so audit replay can read the rejection from
# run_dag_blob the same way it reads any other row.
"run_dag_root": _reject_run_dag["root"],
"run_dag_blob": json.dumps(_reject_run_dag, separators=(",", ":")),
"preflight_hash": _reject_preflight_hash,
"answer_text": _reject_answer_text,
"sources": [],
"n_quotes": 0,
"n_verified": 0,
"verifier_method": "claim_lattice_pointer"
if answer_mode == "claim_lattice_pointer"
else "claim_lattice"
if answer_mode == "claim_lattice"
else "quote",
"unverified_quotes": [],
"partially_verified_quotes": [],
"violations": _reject_violations,
"format_collapsed": None,
"raw_answer": None,
"quantifier_intensity": quantifier["intensity"],
"quantifier_matched_token": quantifier["matched_token"],
"scope_bound_hint": quantifier["scope_bound_hint"],
"quantifier_explicit_count": quantifier["explicit_count"],
"claim_cap_applied": claim_cap_lookup,
# Ticket #000010 — meta-cognition QuestionState surfaced
# for bench / CLI render. Preflight rejection path still
# returns its own status; this is the upstream classifier
# output regardless of guard outcome.
"question_state": question_state.to_dict(),
"pointer_id_distribution": None,
"lazy_anchor_ratio": None,
"retrieval_purity": None,
"prompt_chars": {
"system_prompt": 0,
"grounding_reminder": 0,
"user_question": len(question),
"evidence_or_context": 0,
"messages_total": 0,
},
"answer_chars": 0,
"frame_detection": None,
"burned_existing": 0,
"context_root": None,
"timings": {
"search_ms": 0.0,
"context_ms": 0.0,
"cache_lookup_ms": 0.0,
"llm_ms": None,
"persist_ms": None,
# Preflight rejection runs in <1ms — record 0.0
# rather than re-fetching wall-time. The point of
# the path is to NOT spend wall time.
"total_ms": 0.0,
},
}
t_start = time.monotonic()
# 0. Canonical-projection preflight.
#
# Math- or logic-shaped questions answer exactly, deterministically,
# without retrieval or LLM call. Pure-arithmetic ("0.1 + 0.2") goes
# to arithmetic@v1; pure-propositional ("A IMPL B") to
# logic-kernel@v1. On match, we short-circuit with a synthetic
# CANONICAL_PROJECTION audit_mode — the canonical bytes ARE the
# answer. On no-match or PiStarError we fall through to RAG with
# zero side-effects. Disable per-call via
# policy["canonical_projection_preflight"] = False.
canonical_preflight_on = bool(
policy.get("canonical_projection_preflight", True)
)
if canonical_preflight_on:
_t_canon = time.monotonic()
canonical_match = _canonical_projection_preflight(question)
canonical_ms = _ms_since(_t_canon)
if canonical_match is not None:
pi_star_ref, canonical_bytes = canonical_match
answer_text = canonical_bytes.decode("utf-8", errors="replace")
progress.emit(
"canonical_projection.hit",
pi_star_ref=pi_star_ref,
ms=int(canonical_ms),
)
# Ticket #000027 — persist canonical answers to
# providence_cache. Default ON; operators can disable per
# call via policy["canonical_projection_preflight_persist"]
# = False (keeps the legacy transient render-only behavior
# for tests / probes / scripts that don't want audit-chain
# entries for math questions).
canonical_persist_on = bool(
policy.get("canonical_projection_preflight_persist", True)
)
ckey: str | None = None
audit_event_hash_v: str | None = None
run_dag_root_v: str | None = None
run_dag_blob_v: str | None = None
cached_row = None
lookup_path_v = "preflight_canonical"
status_v = "canonical_projection"
if canonical_persist_on:
from arborist.qa.canonical_cache import (
canonical_cache_key,
increment_hit_count,
lookup_canonical,
persist_canonical,
)
primary_dedup = policy.get(
"question_dedup", "equivalence_class"
)
ckey = canonical_cache_key(
question=question,
pi_star_ref=pi_star_ref,
policy=policy,
mode=primary_dedup,
)
qa_conn = connect(qa_db)
try:
cached_row = lookup_canonical(qa_conn, ckey)
if cached_row is not None:
# Trust the row — kernel-version drift is
# handled by pi_star_ref bumping (synthetic
# source_root changes), not by re-running the
# kernel on hit.
answer_text = cached_row["answer_text"]
run_dag_root_v = cached_row["run_dag_root"]
run_dag_blob_v = cached_row["run_dag_blob"]
audit_event_hash_v = cached_row["audit_event_hash"]
increment_hit_count(qa_conn, ckey)
lookup_path_v = "canonical_cache_hit"
status_v = "cache_hit"
progress.emit(
"cache.hit", lookup_path="canonical"
)
else:
progress.emit("cache.miss", lookup_path="canonical")
audit_event_hash_v, run_dag_root_v, run_dag = (
persist_canonical(
qa_conn,
cache_key_value=ckey,
question=question,
pi_star_ref=pi_star_ref,
canonical_input_bytes=question.encode(
"utf-8"
),
canonical_output_bytes=canonical_bytes,
policy=policy,
chain=chain,
mode=primary_dedup,
)
)
run_dag_blob_v = json.dumps(
run_dag, separators=(",", ":")
)
lookup_path_v = "canonical_cache_miss"
status_v = "cache_miss_then_written"
finally:
qa_conn.close()
# Ticket #000028 — multi-modality witness. Default OFF;
# operator opts in via policy["canonical_witness_enabled"]
# (CLI: --witness). Fans out cache + LLM in parallel against
# the kernel ground truth and records cross-modality
# agreement. Cache leg uses the real persisted-row bytes
# (post-#000027) when persistence is on; otherwise None.
#
# #000028 follow-up — sample rate gating. When enabled,
# `canonical_witness_sample_rate` (default 1.0) chooses
# the fraction of canonical-shape calls that actually
# fire the witness. Operators wanting passive calibration
# set sample_rate=0.05 to pay 5% of the LLM cost while
# still collecting divergence data. 1.0 = always-on
# (current behavior); 0.0 = effectively disabled.
witness_dict = None
witness_skipped_reason: str | None = None
if bool(policy.get("canonical_witness_enabled", False)):
_sample_rate = float(
policy.get("canonical_witness_sample_rate", 1.0)
)
_sample_rate = max(0.0, min(1.0, _sample_rate))
if _sample_rate < 1.0:
import random as _random
_fired = _random.random() < _sample_rate
else:
_fired = True
if not _fired:
witness_skipped_reason = "sampled_out"
progress.emit(
"witness.skipped", reason="sampled_out",
sample_rate=_sample_rate,
)
else:
from arborist.qa.witness import run_witness
# Cache-leg closure: returns the persisted answer
# bytes if we found a prior row at the top of this
# branch (so the witness compares against the
# already-committed canonical answer), else None.
# We use the row from BEFORE we wrote — comparing
# against a row we just wrote in the same call would
# be tautological.
_cached_bytes = (
cached_row["answer_text"].encode(
"utf-8", errors="surrogatepass"
)
if cached_row is not None
else None
)
_cache_lookup = lambda: _cached_bytes # noqa: E731
_witness = run_witness(
question=question,
pi_star_ref=pi_star_ref,
canonical_answer_bytes=canonical_bytes,
cache_lookup=_cache_lookup,
chat_client=chat_client,
model_id=model_id,
timeout_s=float(
policy.get("canonical_witness_timeout_s", 10.0)
),
progress=progress,
)
witness_dict = _witness.to_dict()
# Witness audit event (#000028 follow-up). Records
# the fan-out result on the audit chain so a
# downstream extractor (`make
# bench-witness-divergence`) can pull divergence
# events as 5F-Falsification calibration fixtures.
# Best-effort: chain-write failure must never fail
# the query.
if witness_dict:
try:
from arborist.store import (
append_audit as _append_audit,
)
_wconn = connect(qa_db)
try:
_append_audit(
_wconn,
event_type="providence_canonical_witness",
subject_root=ckey,
body={
"pi_star_ref": pi_star_ref,
"question_text": question,
"agreement_label": witness_dict.get(
"agreement_label"
),
"canonical_answer_text": (
canonical_bytes.decode(
"utf-8", errors="replace"
)
),
"llm_raw_text": (
witness_dict.get("modalities", {})
.get("llm", {})
.get("raw_answer")
),
"llm_canonical_bytes": (
witness_dict.get("modalities", {})
.get("llm", {})
.get("canonical_bytes")
),
"cache_status": (
witness_dict.get("modalities", {})
.get("cache", {})
.get("error")
or "ok"
),
},
)
finally:
_wconn.close()
except Exception: # pragma: no cover
pass
# Capital-ledger record (#000028 follow-up). Witness
# mode adds one full LLM call per fired canonical
# question; record the cost so ForkScore (#000012)
# can compare witness-on vs witness-off forks
# honestly + so operators can budget. Best-effort:
# ledger-write failure must never fail the query.
if (
audit_event_hash_v
and witness_dict
and witness_dict.get("modalities", {}).get("llm")
):
try:
from arborist.capital import store as _capstore
from arborist.capital.profile import profile_for_op
_llm = witness_dict["modalities"]["llm"]
_llm_seconds = float(
_llm.get("elapsed_ms", 0)
) / 1000.0
_profile, _inputs = profile_for_op(
"qa",
{
"prompt_chars": len(question),
"answer_chars": len(
_llm.get("raw_answer") or ""
),
"llm_seconds": _llm_seconds,
"cache_hit": False,
},
)
_conn = connect(qa_db)
try:
_capstore.record(
_conn,
audit_event_hash=audit_event_hash_v,
op_type="canonical_witness",
profile=_profile,
estimator_inputs={
**_inputs,
"agreement_label": witness_dict.get(
"agreement_label"
),
"pi_star_ref": pi_star_ref,
},
)
finally:
_conn.close()
except Exception: # pragma: no cover
# Sidecar discipline: ledger never blocks.
pass
return {
"status": status_v,
"audit_mode": "CANONICAL_PROJECTION",
"verifier_method": "canonical_projection",
"pi_star_ref": pi_star_ref,
"answer_text": answer_text,
"n_quotes": 1,
"n_verified": 1,
"violations": [],
"lookup_path": lookup_path_v,
"sources": [],
"cache_key": ckey,
"audit_event_hash": audit_event_hash_v,
"run_dag_root": run_dag_root_v,
"run_dag_blob": run_dag_blob_v,
"witness": witness_dict,
"timings": {
"canonical_preflight_ms": canonical_ms,
"total_ms": _ms_since(t_start),
},
}
# 1. Search.
#
# Retrieval-only query string: question + operator-supplied
# ``retrieval_keywords`` (a hint, never part of the cache_key /
# LLM prompt / verifier surface). When the user passes
# `--retrieval-keywords "transcranial knowledge acquisition"`,
# only the FTS5 MATCH and title-filter token set see those
# tokens; the question text fed to the LLM and to question_hash
# stays untouched.
retrieval_query = question
# Ticket #000001 §7 Phase 0 — non-fail-closed cross-language case
# (signal fired, a content token survived: the `anarcocapitalismo`
# field shape). Strip es-v1 function words from the RETRIEVAL
# string only so they cannot drive an OR-mode full-corpus FTS5
# scan (the measured 9.9 s cost). question / question_hash / the
# LLM prompt / the verifier surface are untouched. English path:
# `_xlang` is None → unchanged.
if _xlang is not None and not _xlang.fail_closed:
from arborist.qa.crosslang import strip_for_retrieval
retrieval_query = strip_for_retrieval(question, _xlang)
# Ticket #000056 — Operation Sandwich edge IN. A real es→en
# translation is strictly better than the Phase-0 stoppack-strip
# (the English article ranks *primary*, not background). Still
# retrieval-side only; `question`/`question_hash` untouched.
if _sandwich_en_q:
retrieval_query = _sandwich_en_q
if retrieval_keywords and retrieval_keywords.strip():
retrieval_query = f"{retrieval_query} {retrieval_keywords.strip()}"
progress.emit(
"search.start",
top_k=int(top_k),
over_fetch=int(over_fetch),
keywords_appended=bool(retrieval_keywords and retrieval_keywords.strip()),
)
t_phase = time.monotonic()
hits, core_match_roots, phrase_match_roots, root_to_shard = _search_corpus(
shards_dir, single_db, retrieval_query, over_fetch, progress=progress,
)
if not hits:
return {
"status": "no_sources",
"msg": "FTS5 search returned no hits",
"timings": {
"search_ms": _ms_since(t_phase),
"total_ms": _ms_since(t_start),
},
}
qtokens_lower = {t.lower() for t in _title_query_tokens(retrieval_query)}
_bd_lookups = [0]
def _body_density_check(h) -> bool:
# Lazy per-hit check: open the shard, count token mentions in this doc.
sp = root_to_shard.get(h.document_root) or h.shard_path
if not sp:
return False
_bd_lookups[0] += 1
c = connect(sp)
try:
return _body_density_passes(c, h.document_root, qtokens_lower)
finally:
c.close()
hits = _rerank(
hits,
retrieval_query,
core_match_roots=core_match_roots,
body_density_check=_body_density_check,
phrase_match_roots=phrase_match_roots,
hyphen_fold_anchors=_hyphen_fold_variants(retrieval_query),
shards_dir=shards_dir,
progress=progress,
)
search_ms = _ms_since(t_phase)
progress.emit(
"search.done",
hits=len(hits),
body_density_lookups=_bd_lookups[0],
ms=int(search_ms),
)
# 2. Pull doc texts within budget.
#
# Per-source cap so a single huge document can't monopolize the
# context window. Without this, a top-ranked bibliography page
# (e.g. List_of_Batman_comics at 80 KB) consumes the entire 60 KB
# budget at hit #1 and every subsequent doc is dropped with
# char_budget <= 0 — even when the bio article is hit #2 with
# the actual answer. By default we cap each source at
# `max_context_chars // top_k` so all top_k hits land in context.
# Total context ≤ max_context_chars by construction.
t_phase = time.monotonic()
chosen: list[_Hit] = []
context_parts: list[str] = []
per_source_cap = max(1, max_context_chars // max(1, top_k))
char_budget = max_context_chars
for h in hits[:top_k]:
text = _load_doc_text(h.shard_path, h.document_root)
if not text:
continue
# source_role is already set by _rerank_by_source_role; reuse
# it for the per-source cap. Primary answer source gets 2× the
# baseline cap, noisy/sequel get 0.5×, secondary & background
# get 1×. Total context still bounded by `char_budget`.
weight = SOURCE_ROLE_BUDGET_WEIGHTS.get(h.source_role, 1.0)
hit_cap = max(1, int(per_source_cap * weight))
if len(text) > hit_cap:
text = text[:hit_cap]
if len(text) > char_budget:
text = text[:char_budget]
if not text:
continue
context_parts.append(
f"=== Source: {h.document_uri} ===\n{text}"
)
chosen.append(h)
char_budget -= len(text)
if char_budget <= 0:
break
if not chosen:
return {
"status": "no_sources",
"msg": "top-k hits had cold or empty content",
"timings": {
"search_ms": search_ms,
"context_ms": _ms_since(t_phase),
"total_ms": _ms_since(t_start),
},
}
context = "\n\n".join(context_parts)
# Wikitext → prose before the LLM sees it. The model can then quote
# verbatim against the prose form; the verifier compares like-against-
# like. Idempotent if context is already plain prose. Gated on
# policy["base_version"] so this is part of governance_policy_hash.
if policy.get("base_version") and _wikitext_to_base is not None:
context = _wikitext_to_base(context)
context_ms = _ms_since(t_phase)
progress.emit(
"context_assembly",
chosen=len(chosen),
chars=len(context),
ms=int(context_ms),
)
# 3. Build messages + hashes. Branch on answer_mode:
# "quote" (default) raw sources block + verbatim-quote rules.
# "claim_lattice" one evidence object per chosen source, each
# labeled with a content-addressed evidence_id;
# model emits JSON referencing IDs.
evidence_map = []
if answer_mode == "claim_lattice_pointer":
# G0.1 — per-chunk evidence granularity. Each retrieved source
# contributes ONE evidence object per chunk (up to the
# role-weighted per_source_cap budget) instead of one
# whole-doc span.
#
# G0.3 — query-relevance chunk ordering. Within each source,
# chunks are ranked by (distinct_query_tokens_present,
# total_mentions, chunk_idx_asc) so the chunk that most
# textually supports the question gets the lowest pointer id
# and lands at the top of the per-source evidence stack. Without
# this re-rank, Hermes-3-8B lazy-anchors on the first few
# chunks regardless of relevance — burying the actual answer
# paragraph behind irrelevant article-header text. Soft signal
# only (token overlap; no embeddings) so it stays out of the
# proof path; the verifier still runs the same hard checks
# against whatever the model picked.
qtokens_stem_for_chunks = {
_stem_token_for_match(t.lower())
for t in _title_query_tokens(question)
}
chunks_for_map: list[dict] = []
for h in chosen:
doc_chunks = _load_doc_chunks(h.shard_path, h.document_root)
if not doc_chunks:
continue
weight = SOURCE_ROLE_BUDGET_WEIGHTS.get(h.source_role, 1.0)
hit_cap = max(1, int(per_source_cap * weight))
# Score each chunk by query-token overlap. Tuple sort:
# distinct present DESC, total mentions DESC, doc order ASC.
scored = []
for chunk_idx, leaf_hash, span in doc_chunks:
distinct, total = _chunk_query_relevance(
span, qtokens_stem_for_chunks
)
scored.append(
(chunk_idx, leaf_hash, span, distinct, total)
)
scored.sort(key=lambda r: (-r[3], -r[4], r[0]))
# Greedy: fill the per-source budget with relevance-ranked
# chunks. If a single chunk exceeds what's left, truncate
# that one chunk and stop. Total context across the source
# stays bounded by hit_cap, same shape as the prose path.
# Per-source chunk cap also bounds chunk COUNT (in addition
# to char budget) so an encyclopedic article doesn't inflate
# the evidence catalog into E1-E26 territory and induce
# mega-claim failures.
max_chunks = max(1, int(policy.get(
"claim_lattice_max_chunks_per_source", 2
)))
spent = 0
chunks_used = 0
for chunk_idx, leaf_hash, span, _d, _t in scored:
if spent >= hit_cap or chunks_used >= max_chunks:
break
if policy.get("base_version") and _wikitext_to_base is not None:
span = _wikitext_to_base(span)
remaining = hit_cap - spent
if len(span) > remaining:
span = span[:remaining]
if not span:
break
chunks_for_map.append({
"source_root": h.document_root,
"document_uri": h.document_uri,
"title": h.title,
"chunk_idx": chunk_idx,
"chunk_root": leaf_hash,
"span": span,
"source_role": h.source_role,
})
spent += len(span)
chunks_used += 1
evidence_map = build_evidence_map(chunks_for_map)
sys_prompt = policy["claim_lattice_system_prompt"]
grounding_reminder = policy.get("claim_lattice_grounding_reminder")
rendered_evidence = render_evidence_map(evidence_map)
def _user_payload(q: str) -> str:
return f"EVIDENCE:\n\n{rendered_evidence}\n\n---\n\nQUESTION: {q}"
elif answer_mode == "claim_lattice":
# JSON variant — same evidence-map construction as the pointer
# path, but blocks are labeled with content-addressed
# ``evidence_id`` (long hex) since the model emits IDs in JSON.
# The lenient pre-parser in verify_claim_lattice_json keeps the
# path survivable on inference paths without grammar guidance;
# vLLM ``guided_json`` (passed via extra_body below) eliminates
# SCHEMA_INVALID failures at sampling time when available.
qtokens_stem_for_chunks = {
_stem_token_for_match(t.lower())
for t in _title_query_tokens(question)
}
chunks_for_map: list[dict] = []
for h in chosen:
doc_chunks = _load_doc_chunks(h.shard_path, h.document_root)
if not doc_chunks:
continue
weight = SOURCE_ROLE_BUDGET_WEIGHTS.get(h.source_role, 1.0)
hit_cap = max(1, int(per_source_cap * weight))
scored = []
for chunk_idx, leaf_hash, span in doc_chunks:
distinct, total = _chunk_query_relevance(
span, qtokens_stem_for_chunks
)
scored.append((chunk_idx, leaf_hash, span, distinct, total))
scored.sort(key=lambda r: (-r[3], -r[4], r[0]))
max_chunks = max(1, int(policy.get(
"claim_lattice_max_chunks_per_source", 2
)))
spent = 0
chunks_used = 0
for chunk_idx, leaf_hash, span, _d, _t in scored:
if spent >= hit_cap or chunks_used >= max_chunks:
break
if policy.get("base_version") and _wikitext_to_base is not None:
span = _wikitext_to_base(span)
remaining = hit_cap - spent
if len(span) > remaining:
span = span[:remaining]
if not span:
break
chunks_for_map.append({
"source_root": h.document_root,
"document_uri": h.document_uri,
"title": h.title,
"chunk_idx": chunk_idx,
"chunk_root": leaf_hash,
"span": span,
"source_role": h.source_role,
})
spent += len(span)
chunks_used += 1
evidence_map = build_evidence_map(chunks_for_map)
sys_prompt = policy.get(
"claim_lattice_json_system_prompt",
policy["claim_lattice_system_prompt"],
)
grounding_reminder = policy.get(
"claim_lattice_json_grounding_reminder",
policy.get("claim_lattice_grounding_reminder"),
)
rendered_evidence = render_evidence_map_for_json(evidence_map)
def _user_payload(q: str) -> str:
return f"EVIDENCE:\n\n{rendered_evidence}\n\n---\n\nQUESTION: {q}"
else:
sys_prompt = policy["system_prompt"]
grounding_reminder = policy.get("grounding_reminder")
def _user_payload(q: str) -> str:
return f"Sources:\n\n{context}\n\n---\n\nQuestion: {q}"
# Frame detection (Ticket #000002 / Module L). Lattice-mode only.
# Surfaces whether the query is allusion-shape AND the phrase
# route surfaced a reference-work source. When `reference`, the
# polarity preamble below nudges the model toward multi-frame
# answers.
#
# Body sample for the fiction-marker density check uses the
# ARTICLE LEAD (chunk_idx=0) — that's where fiction markers
# cluster on Wikipedia ("is a dystopian science fiction novel
# by..."). Reusing chunks_for_map would give us the query-
# relevant chunks (e.g. the plot section containing 'always been
# at war') which may have fewer fiction markers.
frame_detection: FrameDetection | None = None
if answer_mode in ("claim_lattice_pointer", "claim_lattice"):
sources_for_frame: list[dict] = []
seen_roots: set[str] = set()
for h in chosen:
if h.document_root in seen_roots:
continue
seen_roots.add(h.document_root)
doc_chunks = _load_doc_chunks(h.shard_path, h.document_root)
# First chunk by idx ASC — the article lead.
body_sample = doc_chunks[0][2] if doc_chunks else ""
# Wikitext-strip so fiction markers buried under
# `[[wikilinks]]` and `{{templates}}` surface in the
# density check. Bench-time policy gates the strip; we
# apply it unconditionally here since a no-op fallback
# leaves raw wikitext (and the markers still match the
# `\bnovel\b` regex even with surrounding markup).
if (
policy.get("base_version")
and _wikitext_to_base is not None
and body_sample
):
body_sample = _wikitext_to_base(body_sample)
sources_for_frame.append({
"document_root": h.document_root,
"document_uri": h.document_uri,
"title": h.title,
"body_sample": body_sample,
})
frame_detection = _detect_frame(
question, sources_for_frame, phrase_match_roots=phrase_match_roots
)
# Ticket #000010 §12.6 — refine QuestionState with frame data
# post-retrieval. Pre-retrieval preflight ran with empty
# reference_frames=() (frame detection needs source titles
# which only exist after retrieval). Now that frames are
# known, re-run the classifier so the run-DAG and result-dict
# QuestionState carry the frame-aware logical_statuses
# (specifically `reference_frame_ambiguous` when 2+ frames
# match). Pure function; cheap to re-call.
refined_frames: tuple[str, ...] = ()
if frame_detection is not None:
if frame_detection.frame_kind == "reference":
refined_frames = (
("literal_geography", frame_detection.reference_title or "reference")
if frame_detection.confidence < 1.0
else (frame_detection.reference_title or "reference",)
)
elif frame_detection.frame_kind == "ambiguous":
refined_frames = ("literal", "ambiguous_reference")
if refined_frames:
question_state = preflight_question(
question,
model_profile_id=model_id,
reference_frames=refined_frames,
policy=policy,
)
messages = [{"role": "system", "content": sys_prompt}]
# Polarity preamble for reference-frame queries (Ticket #000002).
# Injected as a user-role message BEFORE the grounding_reminder
# so the model sees the frame hint first, then the
# always-applicable structural reminder, then the actual
# evidence + question.
polarity_template = policy.get("claim_lattice_polarity_preamble", "")
if (
frame_detection is not None
and frame_detection.frame_kind == "reference"
and polarity_template
):
polarity_msg = polarity_template.format(
reference_title=frame_detection.reference_title or ""
)
messages.append({"role": "user", "content": polarity_msg})
if grounding_reminder:
messages.append({"role": "user", "content": grounding_reminder})
# Quantifier-specific reminder (Ticket #000008 Phase 3, default
# off). Same mechanism as runner.ask — see runner.py for
# rationale. Mode-gated and master-killable.
if (
quantifier_guard_on
and quantifier_mode_gated
and quantifier.get("is_broad")
and bool(policy.get("quantifier_reminder_enabled", False))
):
from arborist.qa.quantifier_reminder import broad_quantifier_reminder
broad = broad_quantifier_reminder(
intensity=quantifier["intensity"],
cap=effective_max_claims,
scope_bound_hint=quantifier["scope_bound_hint"],
)
if broad:
messages.append({"role": "user", "content": broad})
# Ticket #000056 — the LLM is prompted with `llm_question` (the
# English translation when the sandwich is active, else the user's
# original question) so it answers in English and the UNTOUCHED
# verifier grounds English-vs-English. `question_hash` / cache
# identity still derive from the original `question`.
messages.append({"role": "user", "content": _user_payload(llm_question)})
# Capacity metrics. Char-level for now — a fast model-agnostic proxy
# for prompt size (rule of thumb: ~4 chars/token for English prose,
# ~2.5 for JSON-evidence-block heavy contexts). Surfaced in the
# result dict so the bench can correlate strict-rate with input
# size and the operator can tell at a glance whether a STRICT
# verdict came from a tight 5KB prompt or a 50KB context-stuffed
# one. Never enters cache_key — these are runtime measurements,
# not policy.
if answer_mode in ("claim_lattice_pointer", "claim_lattice"):
evidence_or_context_chars = len(rendered_evidence)
else:
evidence_or_context_chars = len(context)
prompt_chars = {
"system_prompt": len(sys_prompt or ""),
"grounding_reminder": len(grounding_reminder or ""),
"user_question": len(question or ""),
"evidence_or_context": evidence_or_context_chars,
"messages_total": sum(len(m["content"]) for m in messages),
}
context_root = _context_root([h.document_root for h in chosen])
mhash = model_profile_hash(model_id, revision, quantization)
# Dedup-mode-aware cache_key build. For each mode we substitute the
# mode's canonical question form into the user message used for
# `conversation_hash` (LLM still sees the verbatim question), AND we
# vary `policy["question_dedup"]` to match the mode so
# `governance_policy_hash` matches what an agent under that mode
# would have written. This makes cross-silo fallback work: a
# strict-policy agent looking up with equivalence_class fidelity can
# find a record written by an equivalence_class-policy agent.
def _ckey_for_mode(mode: str) -> str:
canon_q = canonical_question(question, mode=mode)
canon_msgs = list(messages[:-1]) + [
{"role": "user", "content": _user_payload(canon_q)},
]
policy_variant = dict(policy, question_dedup=mode)
return cache_key(
context_root,
question_hash(question, mode=mode),
mhash,
conversation_hash(canon_msgs),
governance_policy_hash(policy_variant),
SCHEMA_VERSION,
CANONICALIZATION_VERSION,
CHUNKING_VERSION,
verifier_policy_hash(policy_variant),
)
ghash = governance_policy_hash(policy) # for the legacy INSERT below
primary_dedup = policy.get("question_dedup", DEFAULT_QUESTION_DEDUP)
if primary_dedup not in QUESTION_DEDUP_MODES:
raise ValueError(
f"policy['question_dedup'] must be one of {QUESTION_DEDUP_MODES}, "
f"got {primary_dedup!r}"
)
# Re-derive the per-mode hashes for use in the INSERT below. The legacy
# INSERT references qhash/chash by name; _ckey_for_mode already builds
# them but doesn't expose the intermediates.
qhash = question_hash(question, mode=primary_dedup)
canonical_q_primary = canonical_question(question, mode=primary_dedup)
canonical_messages_primary = list(messages[:-1]) + [
{"role": "user", "content": _user_payload(canonical_q_primary)},
]
chash = conversation_hash(canonical_messages_primary)
primary_ckey = _ckey_for_mode(primary_dedup)
ckey = primary_ckey # keep the legacy name in the rest of the function
qa_conn = connect(qa_db)
burned_existing = 0
try:
# 3.5. Optional pre-lookup burn: deletes the matching live row
# under the primary cache_key so the lookup misses & a fresh
# inference runs. Test-ergonomic — pass `burn_existing=True`
# (or `make query Q=... BURN=1`) after tweaking a knob to see
# the new behavior. The equivalence-class fallback key is
# deliberately NOT touched: prior alt-mode records stay as
# historic witnesses.
if burn_existing:
existing = qa_conn.execute(
"SELECT cache_key, audit_mode, n_verified, "
" falsification_state, question_text "
"FROM providence_cache WHERE cache_key = ?",
(primary_ckey,),
).fetchone()
if existing is not None:
with transaction(qa_conn):
qa_conn.execute(
"DELETE FROM providence_cache WHERE cache_key = ?",
(primary_ckey,),
)
append_audit(
qa_conn,
event_type="providence_burn",
subject_root=primary_ckey,
body={
"cache_key": primary_ckey,
"burned_audit_mode": existing["audit_mode"],
"burned_n_verified": int(existing["n_verified"] or 0),
"burned_state": existing["falsification_state"],
"question_text": existing["question_text"],
"reason": "query --burn (test-ergonomic mid-query bust)",
},
)
burned_existing = 1
# 4. Cache lookup. Try the primary dedup-mode cache_key first.
# If fidelity allows fallback AND the alternate dedup mode
# produces a different cache_key, try that too — lets a
# fast-cache agent reuse a record written under either mode.
progress.emit("cache_lookup", primary=primary_ckey[:8])
t_phase = time.monotonic()
cached = qa_conn.execute(
"SELECT * FROM providence_cache "
"WHERE cache_key = ? AND falsification_state = 'live'",
(primary_ckey,),
).fetchone()
hit_ckey = primary_ckey
lookup_path = primary_dedup if cached is not None else None
if cached is None and fidelity == "equivalence_class":
other_mode = (
"equivalence_class" if primary_dedup == "strict" else "strict"
)
other_ckey = _ckey_for_mode(other_mode)
if other_ckey != primary_ckey:
cached = qa_conn.execute(
"SELECT * FROM providence_cache "
"WHERE cache_key = ? AND falsification_state = 'live'",
(other_ckey,),
).fetchone()
if cached is not None:
hit_ckey = other_ckey
lookup_path = f"{other_mode}_fallback"
cache_lookup_ms = _ms_since(t_phase)
if cached is not None:
progress.emit(
"cache.hit",
lookup_path=lookup_path,
ms=int(cache_lookup_ms),
)
now = int(time.time())
with transaction(qa_conn):
qa_conn.execute(
"UPDATE providence_cache "
"SET hit_count = hit_count + 1, last_hit_at = ? "
"WHERE cache_key = ?",
(now, hit_ckey),
)
return {
"status": "cache_hit",
"audit_mode": cached["audit_mode"],
"cache_key": hit_ckey,
"lookup_path": lookup_path,
"burned_existing": burned_existing,
"context_root": context_root,
"answer_text": cached["answer_text"],
"sources": json.loads(cached["merkle_proof"])["sources"],
"n_quotes": cached["n_quotes"],
"n_verified": cached["n_verified"],
"verifier_method": cached["verifier_method"],
"unverified_quotes": (
json.loads(cached["unverified_quotes"])
if cached["unverified_quotes"]
else []
),
# Cache schema doesn't carry the partially-verified split —
# those claims were folded into unverified_quotes pre-2026-
# 04-30. Cache hits surface an empty partial list; new
# writes populate it correctly. Acceptable degradation
# since governance_policy_hash invalidated prior records.
"partially_verified_quotes": [],
# Quantifier preflight (Ticket #000008 Phase 1) — the
# classifier is pure on the question string, so cache
# hits can re-classify cheaply and carry the same
# schema as miss-path rows. Bench rows stay
# column-aligned across hit/miss.
"quantifier_intensity": quantifier["intensity"],
"quantifier_matched_token": quantifier["matched_token"],
"scope_bound_hint": quantifier["scope_bound_hint"],
"quantifier_explicit_count": quantifier["explicit_count"],
"claim_cap_applied": claim_cap_lookup,
# Ticket #000010 — meta-cognition QuestionState. Pure
# function, cache hits re-classify cheaply.
"question_state": question_state.to_dict(),
# Ticket #000009 §7.2 — pull preflight_hash out of
# the persisted run_dag_blob. Cache hits don't
# rebuild the DAG; the blob carries the original
# preflight stage hash from the write-time policy.
# None when the cached row predates #000009.
# cached is a sqlite3.Row; column access via
# subscript, not .get(); guard with `keys()` since
# legacy rows may lack the run_dag_blob column.
"preflight_hash": _extract_preflight_hash_from_blob(
cached["run_dag_blob"] if "run_dag_blob" in cached.keys() else None
),
"prompt_chars": prompt_chars,
"answer_chars": len(cached["answer_text"] or ""),
"timings": {
"search_ms": search_ms,
"context_ms": context_ms,
"cache_lookup_ms": cache_lookup_ms,
"llm_ms": None,
"persist_ms": None,
"total_ms": _ms_since(t_start),
},
}
# 5. Cache miss — call LLM. JSON mode emits structured-output
# extras under all three engine conventions (vLLM `guided_json`,
# llama.cpp `json_schema`, OpenAI-spec `response_format`) so the
# schema is enforced regardless of which engine is behind the
# endpoint. Engines silently drop unknown keys. The user-supplied
# `extra_body` (e.g. {"chat_template_kwargs": {"enable_thinking":
# False}} for Qwen) merges on top — user keys override defaults
# if the caller set both, but the common case is disjoint
# namespaces (model-template knobs vs grammar guidance).
from arborist.qa.verify import claim_lattice_structured_output_extras
final_extra_body: dict | None = None
stop_seqs: list[str] | None = None
if answer_mode == "claim_lattice" and policy.get(
"claim_lattice_use_guided_json", True
):
final_extra_body = claim_lattice_structured_output_extras()
if extra_body:
final_extra_body = {**(final_extra_body or {}), **extra_body}
if answer_mode == "claim_lattice":
# JSON-mode token-runaway guard — see runner.py for the
# full rationale. Stops generation on a blank line so
# post-JSON whitespace spam doesn't blow max_tokens.
stop_seqs = list(policy.get(
"claim_lattice_json_stop_sequences", ["\n\n"]
))
progress.emit(
"cache.miss",
)
progress.emit(
"llm.start",
mode=answer_mode,
ctx=sum(len(m.get("content") or "") for m in messages),
)
t_phase = time.monotonic()
raw_answer = chat_client.chat_completion(
messages,
model=model_id,
temperature=policy["temperature"],
max_tokens=policy["max_tokens"],
top_p=policy.get("top_p", 1.0),
extra_body=final_extra_body,
stop=stop_seqs,
)
llm_ms = _ms_since(t_phase)
progress.emit(
"llm.done",
ms=int(llm_ms),
answer_chars=len(raw_answer or ""),
)
# 5b. Faithfulness check. Branch on answer_mode:
# "quote" substring-verify quoted spans against context.
# Optional repair loop (mechanical + reprompt).
# "claim_lattice" deterministic checks on the JSON output:
# evidence_id resolution, source_role allowlist,
# manual-quote prohibition. NO repair loop —
# one-shot benchmark discipline.
repair_changes: list[dict] = []
pre_repair_verdict: dict | None = None
progress.emit("verify.start", mode=answer_mode)
if answer_mode == "claim_lattice_pointer":
verdict = verify_claim_lattice(
raw_answer,
evidence_map,
allowed_source_roles=tuple(
policy.get(
"claim_lattice_allowed_source_roles",
[
"primary_answer_source",
"secondary_context_source",
"background_source",
"unclassified",
],
)
),
max_pointers_per_claim=int(policy.get(
"claim_lattice_max_pointers_per_claim", 2
)),
min_citation_coverage=float(policy.get(
"claim_lattice_min_citation_coverage", 0.30
)),
min_claim_content_tokens=int(policy.get(
"claim_lattice_min_claim_content_tokens", 3
)),
lazy_anchor_demote_threshold=float(policy.get(
"claim_lattice_lazy_anchor_demote_threshold", 0.5
)),
lazy_anchor_demote_min_pairs=int(policy.get(
"claim_lattice_lazy_anchor_demote_min_pairs", 3
)),
max_claims_per_answer=effective_max_claims,
subject_tokens_absent_threshold=int(policy.get(
"claim_lattice_subject_tokens_absent_threshold", 3
)),
question=question,
warrant_check_enabled=bool(policy.get(
"claim_lattice_warrant_check_enabled", True
)),
deflection_check_enabled=bool(policy.get(
"claim_lattice_deflection_check_enabled", True
)),
format_collapse_check_enabled=bool(policy.get(
"claim_lattice_format_collapse_check_enabled", True
)),
)
rendered = verdict["rendered_text"]
answer_text = rendered if rendered else raw_answer
elif answer_mode == "claim_lattice":
verdict = verify_claim_lattice_json(
raw_answer,
evidence_map,
allowed_source_roles=tuple(
policy.get(
"claim_lattice_allowed_source_roles",
[
"primary_answer_source",
"secondary_context_source",
"background_source",
"unclassified",
],
)
),
max_evidence_per_claim=int(policy.get(
"claim_lattice_max_pointers_per_claim", 2
)),
min_citation_coverage=float(policy.get(
"claim_lattice_min_citation_coverage", 0.30
)),
max_claims_per_answer=effective_max_claims,
subject_tokens_absent_threshold=int(policy.get(
"claim_lattice_subject_tokens_absent_threshold", 3
)),
question=question,
warrant_check_enabled=bool(policy.get(
"claim_lattice_warrant_check_enabled", True
)),
deflection_check_enabled=bool(policy.get(
"claim_lattice_deflection_check_enabled", True
)),
)
rendered = verdict["rendered_text"]
answer_text = rendered if rendered else raw_answer
else:
answer_text = raw_answer
verdict = verify_quotes(
answer_text,
context,
entity_policy=policy.get("entity_policy", "hybrid"),
proximity_n=policy.get("entity_proximity_n", 3),
proximity_window=policy.get("entity_proximity_window", 300),
)
def _verify(text: str) -> dict:
return verify_quotes(
text,
context,
entity_policy=policy.get("entity_policy", "hybrid"),
proximity_n=policy.get("entity_proximity_n", 3),
proximity_window=policy.get("entity_proximity_window", 300),
)
if (
policy.get("repair_enabled")
and verdict["audit_mode"] != "STRICT"
and verdict.get("unverified_quotes")
):
# Tier 1: mechanical (deterministic, no extra LLM call).
repair_result = mechanical_repair(
answer_text, verdict["unverified_quotes"], context
)
if repair_result["changes"]:
new_verdict = _verify(repair_result["repaired_text"])
if new_verdict["n_verified"] >= verdict["n_verified"]:
pre_repair_verdict = verdict
answer_text = repair_result["repaired_text"]
verdict = new_verdict
repair_changes = list(repair_result["changes"])
# Tier 2: re-prompt feedback (one extra LLM call max).
max_reprompts = int(policy.get("repair_max_reprompts", 0))
for _ in range(max_reprompts):
if (
verdict["audit_mode"] == "STRICT"
or not verdict.get("unverified_quotes")
):
break
new_text = reprompt_repair(
chat_client=chat_client,
model_id=model_id,
original_messages=messages,
original_answer=answer_text,
failed_quotes=verdict["unverified_quotes"],
policy=policy,
)
if not new_text:
break
new_verdict = _verify(new_text)
if new_verdict["n_verified"] > verdict["n_verified"]:
if pre_repair_verdict is None:
pre_repair_verdict = verdict
answer_text = new_text
verdict = new_verdict
repair_changes.append({
"action": "reprompt_rewrite",
"diagnosis": "model_feedback_loop",
})
else:
break
unverified_blob = (
json.dumps(verdict["unverified_quotes"], separators=(",", ":"))
if verdict["unverified_quotes"]
else None
)
progress.emit(
"verify.done",
audit_mode=verdict.get("audit_mode"),
n_verified=verdict.get("n_verified"),
n_quotes=verdict.get("n_quotes"),
)
# 6. Persist record + audit event.
progress.emit("persist.start")
t_phase = time.monotonic()
proof_obj = {
"context_root": context_root,
"sources": [
{
"document_root": h.document_root,
"document_uri": h.document_uri,
"title": h.title,
"score": h.score,
"chunk_idx": h.chunk_idx,
"shard": Path(h.shard_path).name,
"source_role": h.source_role,
}
for h in chosen
],
}
proof_blob = json.dumps(proof_obj, separators=(",", ":"))
# Per-run Merkle-DAG. Quote mode base shape: 7 stages
# (question / retrieval / context / prompt / answer / verify
# / final_label); becomes 8 when preflight_hash is supplied
# (#000009 inserts ``preflight`` between question & retrieval).
# Pointer mode base shape: 9 stages (question / retrieval /
# evidence_map / prompt / raw_answer / parsed_claim_lattice /
# verify / render / final_label); becomes 10 with preflight.
# Reject-broad early-return path uses a 3-stage minimal DAG
# (question / preflight / final_label) via build_reject_run_dag().
ev_root = evidence_map_root(evidence_map) if evidence_map else None
parsed_lattice = None
is_lattice_mode = answer_mode in ("claim_lattice_pointer", "claim_lattice")
if is_lattice_mode:
evidence_id_pairs = verdict.get("evidence_id_pairs") or []
parsed_lattice = [
{
"claim_text": cs.get("text", ""),
"evidence_ids": evidence_id_pairs[i] if i < len(evidence_id_pairs) else [],
}
for i, cs in enumerate(verdict.get("claim_statuses") or [])
]
# Render-layer source-role display + used/unused. Compute
# which document_roots the verified evidence_ids point at
# (each EvidenceObject's source_root field carries the
# document_root the chunk came from), then annotate each
# source dict in proof_obj. Lets the CLI show per-source
# `primary_answer_source — used (E1)` style annotations.
used_doc_roots: set[str] = set()
evidence_id_to_source_root = {
e.evidence_id: e.source_root for e in (evidence_map or [])
}
for pair in evidence_id_pairs:
for eid in (pair or []):
sroot = evidence_id_to_source_root.get(eid)
if sroot:
used_doc_roots.add(sroot)
# Also map document_root → list of pointer_ids that cited it,
# so the renderer can show the actual pointer tags
# ("used (E1)" not just "used").
evidence_id_to_pointer = {
e.evidence_id: e.pointer_id for e in (evidence_map or [])
}
doc_root_to_pointers: dict[str, list[str]] = {}
for pair in evidence_id_pairs:
for eid in (pair or []):
sroot = evidence_id_to_source_root.get(eid)
pid = evidence_id_to_pointer.get(eid)
if sroot and pid and pid not in doc_root_to_pointers.get(sroot, []):
doc_root_to_pointers.setdefault(sroot, []).append(pid)
for s in proof_obj["sources"]:
droot = s.get("document_root")
s["used"] = droot in used_doc_roots
s["used_pointer_ids"] = doc_root_to_pointers.get(droot, [])
# Retrieval purity metrics — sidecar signals for the bench
# ("did the model ignore noise? did retrieval over-fetch?").
# Pure observation, no decision. Useful for the
# noise-resistance bench fixture and for spotting a
# creeping retrieval-quality regression at aggregate scale.
primary_rank = next(
(i for i, s in enumerate(proof_obj["sources"], start=1)
if s.get("source_role") == "primary_answer_source"),
0, # 0 = no primary in top-K
)
noisy_roles = {
"noisy_background_source", "sequel_background_source",
}
noise_sources = [
s for s in proof_obj["sources"]
if s.get("source_role") in noisy_roles
]
noise_used = [s for s in noise_sources if s.get("used")]
retrieval_purity = {
"primary_rank": primary_rank,
"primary_used": (
primary_rank > 0
and proof_obj["sources"][primary_rank - 1].get("used") is True
),
"noise_sources_count": len(noise_sources),
"noise_sources_used": len(noise_used),
"total_sources": len(proof_obj["sources"]),
"used_sources": sum(
1 for s in proof_obj["sources"] if s.get("used")
),
}
proof_obj["retrieval_purity"] = retrieval_purity
# Retrieval-plan hash (Ticket #000001 / Directive D4):
# capture the operator-influenceable retrieval inputs so the
# run-DAG's retrieval stage binds BOTH plan (what guided the
# search) and result (what got chosen). Folds shard ids when
# available so an audit can reproduce which shards the search
# ran against. Question text intentionally NOT included here
# — already covered by question_hash.
plan = RetrievalPlan(
retrieval_keywords=retrieval_keywords or "",
top_k=int(top_k),
over_fetch=int(over_fetch),
max_context_chars=int(max_context_chars),
shard_ids=tuple(
sorted(
{root_to_shard.get(h.document_root, h.shard_path or "")
for h in chosen if (root_to_shard.get(h.document_root)
or h.shard_path)}
)
),
# #000056 — bind the MT engine identity into the retrieval
# plan iff the sandwich actually translated the query
# (empty otherwise → RetrievalPlan.canonical() omits it →
# zero hash churn on every non-MT run).
mt_engine=(getattr(_mt, "engine_id", "") if _sandwich_en_q else ""),
mt_manifest_hash=(
getattr(_mt, "manifest_hash", "") if _sandwich_en_q else ""
),
source_lang=(_src_lang if _sandwich_en_q else ""),
# Dav1d review 2026-05-19 — bind the active title-token
# fold set so retrieval is replay-identifiable (#000001-
# family, run-DAG not governance).
title_token_policy=_TITLE_TOKEN_POLICY,
)
plan_hash = retrieval_plan_hash(plan)
# Ticket #000009 — preflight node binding (nested CTI clauses
# per ticket §8 / 2026-05-04 feedback). Single DAG stage with
# five nested clauses (classifier, answer_contract,
# prompt_contract, evidence_contract, policy_refs) + the
# metacognition QuestionState.
from arborist.qa.dag import preflight_node_hash
# verifier_policy_hash + model_profile_hash already imported
# at module top; reusing the existing names. Local re-imports
# would shadow earlier free-variable uses.
ghash_for_dag = verifier_policy_hash(policy)
claim_cap_actually_applied = (
claim_cap_lookup
if (quantifier_apply_caps
and quantifier_caps_mode_gated
and claim_cap_lookup is not None)
else None
)
# Reminder injection actually fires when guard is on AND
# mode-gated AND quantifier is broad AND policy enables it.
# Mirrors the gate in the runner.ask() / query() reminder
# block above.
reminder_eligible = (
quantifier_guard_on
and quantifier_mode_gated
and quantifier.get("is_broad", False)
)
reminder_enabled = bool(policy.get("quantifier_reminder_enabled", False))
reminder_injected = reminder_eligible and reminder_enabled
reminder_template_id = None
if reminder_injected:
reminder_template_id = (
"broad-quantifier-bounded-v1"
if quantifier.get("scope_bound_hint") == "bounded"
else "broad-quantifier-unbounded-v1"
)
# Build the canonical payload once; hash it AND persist it
# alongside the DAG nodes so audit replay can render the
# full 5-clause CTI contract via `arborist providence
# --show-preflight`. Hash is deterministic from payload, so
# an auditor can re-verify:
# _sha256_hex(_canonical_json(preflight_payload))
# == nodes[preflight_idx]["hash"]
from arborist.qa.dag import build_preflight_node_payload
_preflight_payload = build_preflight_node_payload(
question_state=question_state.to_dict(),
quantifier=quantifier,
answer_contract={
"guard_enabled": quantifier_guard_on,
"mode_gated": quantifier_mode_gated,
"apply_caps_active": quantifier_apply_caps,
"apply_caps_mode_gated": quantifier_caps_mode_gated,
"claim_cap_resolved": claim_cap_lookup,
"claim_cap_applied": claim_cap_actually_applied,
"manual_quotes_allowed": False,
"evidence_pointer_required": is_lattice_mode,
"allow_unbounded_enumeration": False,
"reject_broad_active": bool(
policy.get("quantifier_reject_broad", False)
),
"metacognition_enabled": bool(
policy.get("metacognition_enabled", True)
),
"block_on_contradiction": bool(
policy.get(
"metacognition_block_on_contradiction", False
)
),
},
prompt_contract={
"reminder_enabled": reminder_enabled,
"reminder_injected": reminder_injected,
"reminder_template_id": reminder_template_id,
},
evidence_contract={
"max_evidence_ids_exposed": int(policy.get(
"claim_lattice_max_pointers_per_claim", 2
)),
"one_claim_per_line": is_lattice_mode,
},
policy_refs={
"governance_policy_hash": ghash_for_dag,
"model_profile_hash": mhash,
"answer_mode": answer_mode,
},
)
from arborist.qa.dag import _sha256_hex, _canonical_json
preflight_hash = _sha256_hex(_canonical_json(_preflight_payload))
run_dag = build_run_dag(
question_hash=qhash,
sources=proof_obj["sources"],
context_root=context_root,
conversation_hash=chash,
answer_text=answer_text,
audit_mode=verdict["audit_mode"],
verifier_method=verdict["verifier_method"],
n_quotes=verdict["n_quotes"],
n_verified=verdict["n_verified"],
claim_statuses=verdict.get("claim_statuses", []),
lookup_path="miss",
evidence_map_root=ev_root,
answer_mode=answer_mode if answer_mode != "quote" else None,
violations=verdict.get("violations"),
raw_answer_text=raw_answer if is_lattice_mode else None,
parsed_lattice=parsed_lattice,
rendered_text=answer_text if is_lattice_mode else None,
retrieval_plan_hash=plan_hash,
preflight_hash=preflight_hash,
preflight_payload=_preflight_payload,
)
run_dag_blob = json.dumps(run_dag, separators=(",", ":"))
now = int(time.time())
with transaction(qa_conn):
# Record the repair event BEFORE the providence_query event so
# the audit chain shows: repair-happened, THEN we wrote the
# final record. Repair body links pre→post verdicts so an
# auditor can reconstruct what changed.
if repair_changes and pre_repair_verdict is not None:
append_audit(
qa_conn,
event_type="providence_repair",
subject_root=ckey,
body={
"kind": "mechanical",
"n_changes": len(repair_changes),
"changes": repair_changes,
"pre_audit_mode": pre_repair_verdict["audit_mode"],
"post_audit_mode": verdict["audit_mode"],
"pre_n_verified": pre_repair_verdict["n_verified"],
"post_n_verified": verdict["n_verified"],
},
ts=now,
)
event_hash = append_audit(
qa_conn,
event_type="providence_query",
subject_root=ckey,
body={
"context_root": context_root,
"n_sources": len(chosen),
"model_id": model_id,
"revision": revision,
"quantization": quantization,
"answer_chars": len(answer_text),
"context_chars": len(context),
"audit_mode": verdict["audit_mode"],
"n_quotes": verdict["n_quotes"],
"n_verified": verdict["n_verified"],
"verifier_method": verdict["verifier_method"],
},
ts=now,
)
qa_conn.execute(
# ON CONFLICT(cache_key) DO NOTHING: the cache lookup runs
# outside this transaction, so two concurrent queries on the
# same cache_key can both miss and both reach this INSERT — the
# loser no-ops instead of raising UNIQUE constraint failed.
"INSERT INTO providence_cache "
"(cache_key, source_root, document_uri, question_hash, question_text, "
" answer_text, merkle_proof, model_profile_hash, conversation_hash, "
" governance_policy_hash, schema_version, canonicalization_version, "
" chunking_version, falsification_state, chain, audit_event_hash, "
" created_at, hit_count, audit_mode, n_quotes, n_verified, "
" unverified_quotes, verifier_method, run_dag_root, run_dag_blob) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'live', ?, ?, ?, 0, "
" ?, ?, ?, ?, ?, ?, ?) "
"ON CONFLICT(cache_key) DO NOTHING",
(
ckey,
context_root,
"corpus://multi-source",
qhash,
question,
answer_text,
proof_blob,
mhash,
chash,
ghash,
SCHEMA_VERSION,
CANONICALIZATION_VERSION,
CHUNKING_VERSION,
chain,
event_hash,
now,
verdict["audit_mode"],
verdict["n_quotes"],
verdict["n_verified"],
unverified_blob,
verdict["verifier_method"],
run_dag["root"],
run_dag_blob,
),
)
finally:
qa_conn.close()
persist_ms = _ms_since(t_phase)
progress.emit("persist.done", ms=int(persist_ms))
# Pull the verify node's failure_stage out of the run_dag for the result.
failure_stage = next(
(n.get("hash") for n in run_dag["nodes"] if n["stage"] == "verify"),
None,
)
# The actual label is in the verify_payload, which we computed in
# localize_failure earlier — recompute for the result dict.
from arborist.qa.dag import localize_failure as _localize
failure_stage = _localize(
audit_mode=verdict["audit_mode"],
n_sources=len(chosen),
n_quotes=verdict["n_quotes"],
n_verified=verdict["n_verified"],
)
result = {
"status": "cache_miss_then_written",
"audit_mode": verdict["audit_mode"],
"cache_key": ckey,
"run_dag_root": run_dag["root"],
"lookup_path": "miss",
"failure_stage": failure_stage,
"repair_changes": repair_changes,
"pre_repair_audit_mode": (
pre_repair_verdict["audit_mode"] if pre_repair_verdict else None
),
"burned_existing": burned_existing,
"context_root": context_root,
"answer_text": answer_text,
"sources": proof_obj["sources"],
"n_quotes": verdict["n_quotes"],
"n_verified": verdict["n_verified"],
"verifier_method": verdict["verifier_method"],
"unverified_quotes": verdict["unverified_quotes"],
"partially_verified_quotes": verdict.get("partially_verified_quotes") or [],
# Violations (per-claim hard-check failures + soft demotes
# like LAZY_ANCHOR_DEMOTE / WARRANT_MISSING). Surfaced so the
# CLI render can name WHY a verdict capped at HYBRID instead
# of reaching STRICT. Persisted into run_dag_blob via the
# verify stage's payload.
"violations": verdict.get("violations") or [],
# Phase 3 of #000031 — per-claim warrant-chain proven idxs.
# Populated when a per-claim warrant_check would have failed
# but the cited chunk's document has a warrant-resolver
# derivation row (Merkle-bound primary-source backing). The
# render layer surfaces this as ``· warrant proven via chain
# ×N`` in the audit-line tail so operators see when a claim
# got through on the chain, not on lexical anchors.
"warrant_proven_claim_idxs": verdict.get("warrant_proven_claim_idxs") or [],
# Format-collapse signal (pointer-mode only — None elsewhere).
# True when the model emitted ≥5 meaningful prose lines with
# zero `[E\d+]` pointer tags, i.e. abandoned the
# claim_lattice_pointer protocol entirely. Surfaced on the
# result dict so bench harness can measure FC rate without
# re-deriving it from raw_answer (which is lattice-only).
# See verify.py:format_collapsed and ticket #000008.
"format_collapsed": verdict.get("format_collapsed"),
# Model's raw output before the renderer interpolates literal
# spans. Lattice modes only — quote/span/entity/paraphrase
# rows have answer_text == raw_answer so this stays None to
# avoid duplication. Bench reads it for bracket-count
# diagnostics; never persisted in providence_cache.
"raw_answer": raw_answer if is_lattice_mode else None,
# Quantifier preflight result (Ticket #000008 Phase 1).
# Surfaced on the result so bench rows pick it up. Phase 1
# is dry-run only — caps not applied; Phase 2 wires
# `claim_cap_applied`. None semantics: classifier always
# returns a dict so these are never None on the miss path,
# but the cache-hit path (line ~2080) doesn't carry them
# since the cached record predates the classifier.
"quantifier_intensity": quantifier["intensity"],
"quantifier_matched_token": quantifier["matched_token"],
"scope_bound_hint": quantifier["scope_bound_hint"],
"quantifier_explicit_count": quantifier["explicit_count"],
# Cap that was looked up for this run. NULL semantics:
# None → guard off, mode opted out, or classifier
# returned no intensity (no cap recorded).
# int → cap that WOULD apply to the verifier when
# quantifier_guard_apply_caps=True. Reported on
# dry-run rows so bench can chart cap distribution
# before the gate flips.
# Phase 2 dry-run: cap is REPORTED but not APPLIED to the
# verifier — claim_lattice_max_claims_per_answer (default
# 12) still drives TOO_MANY_CLAIMS demote until
# quantifier_guard_apply_caps flips True.
"claim_cap_applied": claim_cap_lookup,
# Ticket #000010 — meta-cognition QuestionState. Surfaces
# the full preflight classification (logical_statuses,
# question_shape, contradiction_pairs, false_premise_hints,
# temporal_sensitivity, etc.) for bench / CLI render. First
# pass does NOT bind into run_dag_root (deferred to ticket
# #000009 Phase 5).
"question_state": question_state.to_dict(),
# Ticket #000009 §7.2 — preflight stage hash. Surfaced
# for cross-row preflight-policy comparison in bench /
# operator tools. Same hash that's bound into the
# `preflight` stage of run_dag_root.
"preflight_hash": preflight_hash,
# Ticket #000011 — soft preflight sidecar hint. Advisory
# only; NEVER enters the verifier proof path. Renderer
# surfaces as `· soft: <label>` on the audit-line tail.
"soft_preflight_hint": soft_hint.to_dict(),
# Sidecar smell signals (claim_lattice mode only) — surfaced
# for the renderer; never persisted in providence_cache and
# never threaded into run_dag_root.
"pointer_id_distribution": verdict.get("pointer_id_distribution"),
"lazy_anchor_ratio": verdict.get("lazy_anchor_ratio"),
# Retrieval-purity sidecar (claim_lattice modes; None for
# quote/span/entity/paraphrase). Render-layer + bench-tracking
# metric: surfaces "model ignored N noisy sources, used the
# primary at rank R" without folding into the proof path.
"retrieval_purity": proof_obj.get("retrieval_purity"),
"prompt_chars": prompt_chars,
"answer_chars": len(answer_text or ""),
# Frame detection (Ticket #000002 / Module L). Sidecar signal
# for renderer / bench; never persisted in providence_cache,
# never enters governance_policy_hash. None for quote-mode
# rows; populated for lattice-mode rows.
"frame_detection": (
{
"kind": frame_detection.frame_kind,
"reference_title": frame_detection.reference_title,
"reference_uri": frame_detection.reference_uri,
"confidence": frame_detection.confidence,
}
if frame_detection is not None
else None
),
"timings": {
"preflight_ms": preflight_ms,
"soft_preflight_ms": soft_preflight_ms,
"search_ms": search_ms,
"context_ms": context_ms,
"cache_lookup_ms": cache_lookup_ms,
"llm_ms": llm_ms,
"persist_ms": persist_ms,
"total_ms": _ms_since(t_start),
},
}
if os.environ.get("ARBORIST_NLI_SHADOW"):
# #000049 Phase 2 — surface the verifier-input text so an
# off-line NLI shadow sweep can re-derive (answer, source) pairs
# and measure the would-demote rate on real traffic (§7 #12
# gate item 4). Off by default — bloats bench rows; this is a
# measurement hook, never a cache_key / governance / audit_mode
# input.
result["verifier_input_text"] = (
rendered_evidence if is_lattice_mode else context
)
# Ticket #000056 — Operation Sandwich edge OUT. Render the
# *already-verified English* `answer_text` back to the user's
# language as DISPLAY-ONLY, additive keys. This runs AFTER the
# proof/cache/run-DAG are finalised above: it is never the
# verifier's input, never hashed into the proof, never persisted
# as a proof column. `answer_text` (the grounded string) and
# `audit_mode` are untouched — the Spanish text carries zero
# grounding and is banner-labelled (the `_render_audit_label`
# render-projection discipline).
if _sandwich_en_q and _mt is not None:
# Translate the model's PROSE, never the rendered evidence
# scaffold. In lattice modes `answer_text` is the rendered
# answer with interpolated *verbatim pinned English source
# spans* (`[E1 | Title | hash: "quote"]`) — MT-ing those would
# corrupt the very spans the verifier matched. `raw_answer`
# (model output, present in lattice modes) is the prose +
# opaque [E1] markers; quote mode has no scaffold so
# `answer_text` is already prose.
_src_en = result.get("raw_answer") or result.get("answer_text") or ""
_es = _mt.translate(_src_en, "en", _src_lang) if _src_en else ""
result["display_answer"] = _es or _src_en
result["display_lang"] = _src_lang
result["display_source_lang"] = "en"
result["display_translated"] = True
result["display_engine"] = getattr(_mt, "engine_id", "")
result["display_unverified_banner"] = (
"Traducción automática para lectura — la verificación se "
"realizó sobre la respuesta en inglés y sus fuentes en "
"inglés. / Machine-translated for display; grounding was "
"verified on the English answer against English sources."
)
return result
def _ms_since(t: float) -> float:
"""Wall-time elapsed in milliseconds, rounded to 1 decimal."""
return round((time.monotonic() - t) * 1000, 1)
# Pure-arithmetic shape: digits, dots, /, *, +, -, parens, **, whitespace.
# Rejects letters — no identifiers (so "x+1" or "what is 2+2" fall through).
_CANONICAL_ARITHMETIC_RE = re.compile(r"^[\s\d./*+\-()]+$")
# Symbolic-algebra shape (ticket #000030): expression syntax that allows
# lowercase identifiers in addition to arithmetic chars. Uppercase is
# reserved for the logic route. Anchored fullmatch so trailing punctuation
# / question marks fall through cleanly.
_CANONICAL_ALGEBRA_RE = re.compile(r"^[\s\d.a-z_*+\-/()]+$")
# Reject leading natural-language verb followed by an operand. Catches
# "simplify (x+1)**2", "factor x**2 - 1", "expand (a+b)**2", "evaluate
# sin(x)" while still accepting `alpha + beta` (multi-letter Greek-name
# variable on the left of an operator). The 4-char minimum keeps
# common math identifiers (`x`, `xy`, `sin`, `cos`, `pi`) from tripping.
_CANONICAL_ALGEBRA_NL_LEAD_RE = re.compile(r"^\s*[a-z_]{4,}\s+[(\w]")
# Propositional logic shape: only uppercase atoms and reserved keywords.
_CANONICAL_LOGIC_KEYWORDS = {
"AND", "OR", "NOT", "IMPL", "IFF", "XOR", "TRUE", "FALSE",
}
_CANONICAL_LOGIC_TOKEN_RE = re.compile(r"[A-Z][A-Z]*|[()]")
_CANONICAL_LOGIC_VALID_RE = re.compile(r"^[\sA-Z()]+$")
def _canonical_projection_preflight(question: str) -> tuple[str, bytes] | None:
"""Try a math/logic π* before RAG starts. Returns
``(pi_star_ref, canonical_bytes)`` on success; ``None`` to fall
through to text retrieval.
Sniff is conservative — pure-arithmetic shape (no letters at all)
or pure-propositional shape (uppercase atoms + reserved keywords
only). Anything natural-language (a question mark, lowercase
words, mixed shape) returns ``None`` unmodified. The π* itself
is the final arbiter: a shape match that fails canonicalization
(PiStarError) also falls through.
"""
from arborist.pi_star import PiStarError, get
q = question.strip()
if not q:
return None
# Math route — pure-arithmetic shape.
if _CANONICAL_ARITHMETIC_RE.fullmatch(q):
try:
out = get("arithmetic@v1").canonicalize(q.encode("utf-8"))
return ("arithmetic@v1", out)
except PiStarError:
return None
# Algebra route (ticket #000030) — lowercase letters allowed, but
# require at least one letter (else arithmetic would have caught it)
# AND no leading natural-language verb. The π* itself is the final
# arbiter: a shape match that fails canonicalization (PiStarError,
# boolean-shaped input, etc.) falls through gracefully.
if (
_CANONICAL_ALGEBRA_RE.fullmatch(q)
and any(c.isalpha() for c in q)
and not _CANONICAL_ALGEBRA_NL_LEAD_RE.match(q)
):
try:
out = get("algebra-symbolic@v1").canonicalize(q.encode("utf-8"))
return ("algebra-symbolic@v1", out)
except (PiStarError, KeyError):
# KeyError when sympy is missing and algebra-symbolic@v1 was
# never registered; PiStarError on parse / domain failures.
return None
# Logic route — uppercase atoms + reserved keywords only. Require
# at least one logic operator keyword so a bare "A" doesn't trip
# the route (a one-atom question is more likely natural language).
if _CANONICAL_LOGIC_VALID_RE.fullmatch(q):
words = [w for w in _CANONICAL_LOGIC_TOKEN_RE.findall(q) if w not in "()"]
has_keyword = any(w in _CANONICAL_LOGIC_KEYWORDS for w in words)
if has_keyword:
try:
out = get("logic-kernel@v1").canonicalize(q.encode("utf-8"))
return ("logic-kernel@v1", out)
except PiStarError:
return None
return None