"""Retrieval-time concept lookup. Cross-shard, read-only.
Public API matches the legacy ``arborist.qa.concepts`` shape so existing
call sites in ``query.py`` keep working unchanged. Behavior changes:
- Backed by the ``concept_relations`` SQLite table instead of in-Python
frozensets.
- Walks every shard in ``shards_dir`` (same UNION pattern as cross-shard
FTS5 search). Concept relations from shard 003 are visible to a query
routed at shard 000 — exactly what we want for a 3.47M-doc corpus
split across many shards.
- Token comparison is case-insensitive at the SQL layer (NOCASE on
LOWER()). Stored capitalization is preserved.
Cache: a per-process LRU keyed on ``shards_dir`` mtime. Lookups in a
hot loop don't re-walk shards. Cache invalidates when any shard file's
mtime changes (e.g. after `arborist concepts derive` writes new rows).
"""
from __future__ import annotations
import re
import time
from pathlib import Path
from arborist.store import connect_query
# Tokens that mean "user wants both sides of any rivalry shown" —
# kept here (not in DB) because compare-phrasing detection is a
# query-classification task, not a corpus-derived signal.
COMPARE_WORDS: frozenset[str] = frozenset({
"vs", "versus", "compare", "compared", "comparison", "compares",
"between", "difference", "differences", "or", "either",
})
[docs]
def has_compare_phrasing(question: str) -> bool:
"""True if the question contains comparison language."""
lower = (question or "").lower()
words = set(re.findall(r"[a-z]+", lower))
return bool(words & COMPARE_WORDS)
# ---------------------------------------------------------------------------
# Cross-shard lookup with mtime-keyed cache
# ---------------------------------------------------------------------------
# Eager-load cache (legacy) — kept for back-compat callers that still
# go through `_get_indices`. The hot path (synonym_expand,
# rivalry_excluded) uses the lazy caches below.
#
# Cache shape: { shards_dir_str: (mtime_sig, manual_index, derived_index, rivalry_pairs, token_idf) }
# - manual_index: curated synonym edges; always expanded
# - derived_index: corpus-derived synonym edges; expansion subject to per-token cap
# - rivalry_pairs: list of (set_a, set_b) — frozensets of lowercase tokens
# - token_idf: { token_lower: doc_freq } summed across shards, used for cap-time ranking
_CACHE: dict[str, tuple[tuple, dict, dict, list, dict]] = {}
# Lazy per-token neighbor cache (#000026 follow-up). The eager loader
# pulled all 290K concept_relations rows on first call (~2.4 s) so a
# single CLI query paid that cost up front; benchmarks amortize it.
# We don't actually need the full graph — `synonym_expand` only ever
# touches direct neighbors of the question's tokens. For a 5-token
# question that's <300 rows, queryable in ~300 ms total via
# WHERE token IN (...) OR target IN (...).
#
# Shape: { shards_dir_str: { "sig": mtime_signature, "manual": {token: {neighbors}},
# "derived": {token: {neighbors}} } }
# A token is "known" once we've queried for it; the empty-set claim
# means "we asked SQL and got nothing back" — distinguishes from
# "we haven't asked yet."
_NEIGHBOR_CACHE: dict[str, dict] = {}
# Process-wide rivalry-row cache. The corpus has ~2 rivalry rows
# total today, so this is essentially free; cache lets repeated calls
# skip the round-trip entirely.
# Shape: { shards_dir_str: (mtime_sig, [(token, target), ...]) }
_RIVALRY_ROWS_CACHE: dict[str, tuple[tuple, list[tuple[str, str]]]] = {}
def _shards_mtime_signature(shards_dir: Path) -> tuple:
"""Return a tuple of (path, mtime_ns) for every *.db in shards_dir.
Stable across runs as long as no shard's mtime changes."""
if not shards_dir.is_dir():
return ()
return tuple(
(str(p.resolve()), p.stat().st_mtime_ns)
for p in sorted(shards_dir.glob("*.db"))
)
def _load_token_idf(shards_dir: Path) -> dict[str, int]:
"""Sum per-token doc_freq across all shards.
The concept_token_idf table is per-shard (the `chunks_fts` index
it derives from is per-shard), so cross-shard ranking aggregates
via SUM over the UNION view exposed by connect_query. Tokens not
in any shard's table get NO entry; callers default to "treat as
rare" via dict.get() with a high-rarity default.
"""
conn = connect_query(shards_dir=shards_dir)
rows = conn.execute(
"SELECT token, SUM(doc_freq) AS df FROM concept_token_idf GROUP BY token"
).fetchall()
conn.close()
return {(r["token"] or "").lower(): int(r["df"] or 0) for r in rows}
def _load_indices(shards_dir: Path) -> tuple[dict, dict, list]:
"""Walk all shards, materialize the synonym & rivalry indices.
Synonyms map every token to its **direct neighbors only**, NOT the
transitive closure across reciprocal-link chains. Why: union-find
over the full Wikipedia reciprocal-link graph collapses everything
into one giant connected component (54k+ tokens for any seed in
a 4-shard corpus). Direct-neighbor expansion preserves the legacy
frozenset semantics (every member of a group expanded to the others
in that group, but the groups didn't chain).
Two synonym indices are built, one per evidence class:
- ``manual_index`` — manual_legacy + manual rows. Curated; always
expanded regardless of per-token degree.
Captures the brain-tech / AMD-family / etc.
seed groups whose anchor token has many
deliberate members (e.g. telepathy → 29
members of the brain-tech group).
- ``derived_index`` — link_reciprocity & other corpus-derived
extractors. Subject to per-token degree
cap because the Wikipedia link graph
carries topic-adjacency noise on generic
tokens (person, thoughts, language).
Rivalry pairs use the union of both indices for closure.
"""
conn = connect_query(shards_dir=shards_dir)
rows = conn.execute(
"SELECT relation_kind, evidence_kind, token, target FROM concept_relations"
).fetchall()
conn.close()
manual_index: dict[str, set[str]] = {}
derived_index: dict[str, set[str]] = {}
rivalry_rows: list[tuple[str, str]] = []
# Curated evidence kinds — never capped at expansion time.
MANUAL_KINDS = {"manual", "manual_legacy"}
for r in rows:
kind = r["relation_kind"]
evidence_kind = r["evidence_kind"]
a = (r["token"] or "").lower()
b = (r["target"] or "").lower()
if not a or not b or a == b:
continue
if kind == "synonym":
target_index = (
manual_index if evidence_kind in MANUAL_KINDS else derived_index
)
target_index.setdefault(a, set()).add(b)
target_index.setdefault(b, set()).add(a)
elif kind == "rivalry":
rivalry_rows.append((a, b))
# Rivalry pairs use union of manual + derived neighborhoods.
rivalry_pairs: list[tuple[frozenset[str], frozenset[str]]] = []
for a, b in rivalry_rows:
ga = frozenset(
manual_index.get(a, set()) | derived_index.get(a, set()) | {a}
)
gb = frozenset(
manual_index.get(b, set()) | derived_index.get(b, set()) | {b}
)
rivalry_pairs.append((ga, gb))
return manual_index, derived_index, rivalry_pairs
def _get_indices(shards_dir: Path | str | None) -> tuple[dict, dict, list, dict]:
"""Return (manual_index, derived_index, rivalry_pairs, token_idf),
using cached values when the shard mtime signature is unchanged."""
if shards_dir is None:
return {}, {}, [], {}
p = Path(shards_dir)
key = str(p.resolve())
sig = _shards_mtime_signature(p)
cached = _CACHE.get(key)
if cached is not None and cached[0] == sig:
return cached[1], cached[2], cached[3], cached[4]
manual, derived, riv = _load_indices(p)
idf = _load_token_idf(p)
_CACHE[key] = (sig, manual, derived, riv, idf)
return manual, derived, riv, idf
[docs]
def invalidate_cache() -> None:
"""Drop all cached indices. Call after a writer commits new rows
(the mtime check would catch this on next read, but invalidating
explicitly is faster on the same-process write+read pattern)."""
_CACHE.clear()
_NEIGHBOR_CACHE.clear()
_RIVALRY_ROWS_CACHE.clear()
# ---------------------------------------------------------------------------
# Lazy neighbor loading — the hot path for synonym_expand / rivalry_excluded
# ---------------------------------------------------------------------------
def _ensure_neighbor_cache(shards_dir: Path) -> dict:
"""Return the live neighbor cache dict for `shards_dir`. Drops &
re-initializes if any shard's mtime has changed since last load."""
sig = _shards_mtime_signature(shards_dir)
key = str(shards_dir.resolve())
cached = _NEIGHBOR_CACHE.get(key)
if cached is None or cached["sig"] != sig:
_NEIGHBOR_CACHE[key] = {"sig": sig, "manual": {}, "derived": {}}
return _NEIGHBOR_CACHE[key]
def _load_neighbors_for(
shards_dir: Path, tokens: set[str]
) -> tuple[dict[str, set[str]], dict[str, set[str]]]:
"""Lazy targeted loader. Returns (manual_subset, derived_subset)
mapping just the requested tokens to their direct neighbors.
Hits SQL only for tokens we haven't yet asked about in this
process. Once a token has been queried (even returning zero
rows), we cache the empty set so subsequent calls don't re-query.
"""
cache = _ensure_neighbor_cache(shards_dir)
qlower = {t.lower() for t in tokens if t}
missing = sorted(t for t in qlower
if t not in cache["manual"] and t not in cache["derived"])
if missing:
ph = ",".join("?" * len(missing))
params = missing + missing
conn = connect_query(shards_dir=shards_dir)
try:
rows = conn.execute(
f"SELECT relation_kind, evidence_kind, token, target "
f"FROM concept_relations "
f"WHERE relation_kind = 'synonym' "
f"AND (token IN ({ph}) OR target IN ({ph}))",
params,
).fetchall()
finally:
conn.close()
for r in rows:
evidence_kind = r["evidence_kind"]
a = (r["token"] or "").lower()
b = (r["target"] or "").lower()
if not a or not b or a == b:
continue
target_index = (
cache["manual"]
if evidence_kind in {"manual", "manual_legacy"}
else cache["derived"]
)
target_index.setdefault(a, set()).add(b)
target_index.setdefault(b, set()).add(a)
# Mark every queried token as "we've asked" so we don't
# re-query empty results. Either index is fine — synonym_expand
# checks both with .get(t, set()).
for t in missing:
cache["manual"].setdefault(t, set())
cache["derived"].setdefault(t, set())
return (
{t: cache["manual"].get(t, set()) for t in qlower},
{t: cache["derived"].get(t, set()) for t in qlower},
)
def _load_rivalry_rows(shards_dir: Path) -> list[tuple[str, str]]:
"""Process-wide cache of rivalry-relation rows for one shards_dir.
Refresh on mtime change. The corpus has ~2 rivalry rows total
today; this is essentially zero-cost after the first call."""
sig = _shards_mtime_signature(shards_dir)
key = str(shards_dir.resolve())
cached = _RIVALRY_ROWS_CACHE.get(key)
if cached is not None and cached[0] == sig:
return cached[1]
conn = connect_query(shards_dir=shards_dir)
try:
rows = conn.execute(
"SELECT token, target FROM concept_relations "
"WHERE relation_kind = 'rivalry'"
).fetchall()
finally:
conn.close()
pairs = []
for r in rows:
a = (r["token"] or "").lower()
b = (r["target"] or "").lower()
if a and b and a != b:
pairs.append((a, b))
_RIVALRY_ROWS_CACHE[key] = (sig, pairs)
return pairs
def _load_idf_for(shards_dir: Path, tokens: set[str]) -> dict[str, int]:
"""Lazy IDF loader — only fetched when expansion exceeds
``max_total`` and we need to rank for truncation. Avoids the
66 K-row dump on the common case (small expansions stay under
the cap)."""
if not tokens:
return {}
qlower = {t.lower() for t in tokens if t}
if not qlower:
return {}
ph = ",".join("?" * len(qlower))
conn = connect_query(shards_dir=shards_dir)
try:
rows = conn.execute(
f"SELECT token, SUM(doc_freq) AS df FROM concept_token_idf "
f"WHERE token IN ({ph}) GROUP BY token",
list(qlower),
).fetchall()
finally:
conn.close()
return {(r["token"] or "").lower(): int(r["df"] or 0) for r in rows}
def _load_neighbor_source_freq(
shards_dir: Path, token: str, neighbors: set[str]
) -> dict[str, int]:
"""For one ``token`` whose derived-synonym degree exceeds the
per-token cap, return ``{neighbor: source_root_count}`` — how many
distinct documents asserted the (token, neighbor) synonym
relation. Used to rank-and-truncate over-cap derived expansions
instead of hard-skipping a homonym acronym entirely (#000054
Phase 2a — CPU has 13 legitimate homonym expansions across the
corpus, of which `central / processing / unit` are anchored by
hundreds of documents and the noise tail by 1-2 each).
The extractor emits bidirectional edges, so for any (a, b) pair
both rows exist with token=a/target=b and token=b/target=a, each
anchored to the doc that defined them. Counting on one direction
captures the asymmetric assertion. #000054."""
if not neighbors:
return {}
nlower = {n.lower() for n in neighbors if n}
if not nlower:
return {}
ph = ",".join("?" * len(nlower))
conn = connect_query(shards_dir=shards_dir)
try:
rows = conn.execute(
f"SELECT target, COUNT(DISTINCT source_root) AS cnt "
f"FROM concept_relations "
f"WHERE relation_kind = 'synonym' "
f" AND token = ? AND target IN ({ph}) "
f"GROUP BY target",
[token.lower()] + list(nlower),
).fetchall()
finally:
conn.close()
return {(r["target"] or "").lower(): int(r["cnt"] or 0) for r in rows}
# ---------------------------------------------------------------------------
# Public API — matches the legacy ``arborist.qa.concepts`` shape
# ---------------------------------------------------------------------------
# Caps that match the legacy (frozenset) expansion size. The corpus-
# derived synonym graph is noisier than hand-curated frozensets:
# generic tokens like "person" / "thoughts" / "language" have ~20+
# reciprocal-link neighbors each, most of which are topic-adjacency
# noise rather than actual synonyms. Without these caps a 19-token
# query expands to ~400 accept tokens, and the title-LIKE search
# multiplies that by the document count to a many-minute hang.
#
# MAX_NEIGHBORS_PER_TOKEN: any token with more than this many
# direct neighbors is treated as "too generic to expand" — we add
# only the token itself, not its neighbors. Mirrors how the legacy
# frozensets covered named entities (athlon, pentium) but not common
# words (person, thoughts).
#
# MAX_TOTAL_TOKENS: overall cap on the expanded set. Original query
# tokens are always preserved; once total exceeds the cap, neighbors
# are sorted alphabetically & truncated. Bound on title-LIKE clause
# count keeps the SQL tractable on a 3.47M-doc corpus.
MAX_NEIGHBORS_PER_TOKEN = 8
MAX_TOTAL_TOKENS = 50
def _load_strict_neighbors_for(
shards_dir: Path, tokens: set[str]
) -> dict[str, set[str]]:
"""High-trust synonym neighbors only — manual + manual_legacy +
acronym_parens. Excludes ``link_reciprocity`` because reciprocal
wikilinks express topical adjacency, not synonymy (a
``Dinosaurs`` page reciprocally links to a ``Curious George
Brigade`` page → an edge that should not amplify retrieval). The
multiplicative title-purity rerank uses this strict view; the
additive title-rerank + retrieval routes use the broader
``synonym_expand``. #000054 Phase 2b."""
qlower = {t.lower() for t in tokens if t}
if not qlower:
return {}
ph = ",".join("?" * len(qlower))
params = list(qlower) + list(qlower)
conn = connect_query(shards_dir=shards_dir)
try:
rows = conn.execute(
f"SELECT token, target FROM concept_relations "
f"WHERE relation_kind = 'synonym' "
f" AND evidence_kind IN ('manual', 'manual_legacy', 'acronym_parens') "
f" AND (token IN ({ph}) OR target IN ({ph}))",
params,
).fetchall()
finally:
conn.close()
out: dict[str, set[str]] = {t: set() for t in qlower}
for r in rows:
a = (r["token"] or "").lower()
b = (r["target"] or "").lower()
if not a or not b or a == b:
continue
if a in out:
out[a].add(b)
if b in out:
out[b].add(a)
return out
def synonym_expand_strict(
tokens: set[str],
*,
shards_dir: Path | str | None = None,
max_neighbors_per_token: int = MAX_NEIGHBORS_PER_TOKEN,
) -> set[str]:
"""Like ``synonym_expand`` but restricted to high-trust evidence
kinds (manual / acronym_parens) — see ``_load_strict_neighbors_for``.
Used by the title-purity multiplier; the broader ``synonym_expand``
is right for retrieval (additive boosts) but its inclusion of
noisy ``link_reciprocity`` edges blows up under a multiplicative
rank. #000054 Phase 2b."""
if not tokens or shards_dir is None:
return set(t.lower() for t in tokens if t)
p = Path(shards_dir)
qlower = {t.lower() for t in tokens if t}
neighbors_by_token = _load_strict_neighbors_for(p, qlower)
expanded: set[str] = set(qlower)
for t in qlower:
nb = neighbors_by_token.get(t, set())
if not nb:
continue
if len(nb) <= max_neighbors_per_token:
expanded |= nb
continue
# Frequency rank — same discipline as the broad path.
freq = _load_neighbor_source_freq(p, t, nb)
ranked = sorted(nb, key=lambda n: (-freq.get(n, 0), n))
expanded |= set(ranked[:max_neighbors_per_token])
return expanded
[docs]
def synonym_expand(
tokens: set[str],
*,
shards_dir: Path | str | None = None,
max_neighbors_per_token: int = MAX_NEIGHBORS_PER_TOKEN,
max_total: int = MAX_TOTAL_TOKENS,
) -> set[str]:
"""Add direct synonym neighbors for any input token whose degree
is bounded enough that its neighbors are likely topical, not
topic-adjacency noise.
Two caps protect retrieval performance & quality:
1. ``max_neighbors_per_token`` — tokens with more direct neighbors
than this contribute NO expansion. Generic tokens ("person",
"thoughts") have huge degree in the Wikipedia reciprocal-link
graph; expanding them dumps random topical-cluster noise.
Specific named entities ("athlon", "telepathy") have small
focused neighborhoods that pass the cap.
2. ``max_total`` — overall cap on expanded set size. Bounds the
SQL clause count downstream. Original query tokens are always
preserved; if total > cap, neighbors are sorted alphabetically
& truncated.
"""
if not tokens:
return set()
if shards_dir is None:
return set(tokens)
p = Path(shards_dir)
qlower = {t.lower() for t in tokens}
# Lazy load — only the rows whose token or target appears in the
# question. Drops first-call cost from ~2.4 s (290 K-row scan +
# Python iteration) to ~0.3 s (targeted WHERE-IN query, ~300 rows).
manual_index, derived_index = _load_neighbors_for(p, qlower)
expanded: set[str] = set(qlower)
# Manual (curated) synonyms always expand: brain-tech / AMD-family /
# etc. seed groups have legitimately many members per anchor & we
# trust the curation.
for t in qlower:
expanded |= manual_index.get(t, set())
# Derived (corpus-extracted) synonyms cap on per-token degree.
# Generic tokens like "person" / "thoughts" / "language" have wide
# noisy neighborhoods in the reciprocal-link graph; specific tokens
# have bounded ones. When the per-token degree exceeds the cap we
# *rank-and-truncate* by source-frequency rather than hard-skipping
# (#000054 Phase 2a) — a homonym acronym ("CPU" has 13 expansions
# across the corpus: central/processing/unit anchored by hundreds
# of docs, canadian/contract/pharmaceutical by 1-2 each) should
# still contribute its dominant expansion, not nothing. The
# frequency rank surfaces the canonical pairing and drops the
# noise tail. Below-cap tokens stay on the fast path (no SQL).
for t in qlower:
neighbors = derived_index.get(t, set())
if not neighbors:
continue
if len(neighbors) <= max_neighbors_per_token:
expanded |= neighbors
continue
# Over the per-token cap: rank by source-frequency (desc),
# keep the top ``max_neighbors_per_token``.
freq = _load_neighbor_source_freq(p, t, neighbors)
ranked = sorted(neighbors, key=lambda n: (-freq.get(n, 0), n))
expanded |= set(ranked[:max_neighbors_per_token])
if len(expanded) > max_total:
# IDF-rank the neighbors (rarer = more topical = keep first).
# Tokens absent from concept_token_idf get a sentinel high-
# rarity score so a hapax doesn't lose to a known-common
# token at the truncation boundary. When the IDF table is
# empty (backfill not run yet), ranking degenerates to
# alphabetical (fallback compatibility).
neighbors_only = expanded - qlower
# Lazy IDF — only fetched when we hit the cap. Most queries
# never reach this branch.
token_idf = _load_idf_for(p, neighbors_only)
SENTINEL_HIGH_RARITY = 0
ranked = sorted(
neighbors_only,
key=lambda t: (token_idf.get(t, SENTINEL_HIGH_RARITY), t),
)
budget = max(0, max_total - len(qlower))
expanded = qlower | set(ranked[:budget])
return expanded
[docs]
def rivalry_excluded(
tokens: set[str],
*,
shards_dir: Path | str | None = None,
compare_phrasing: bool = False,
) -> set[str]:
"""Tokens whose presence in a doc title means EXCLUDE that doc.
For each rivalry pair (A, B): if exactly ONE side appears in the
query AND no comparison language was used, exclude the OTHER side's
tokens. If both sides appear, or if the user asked for a comparison,
no exclusion (they wanted both).
"""
if compare_phrasing or not tokens or shards_dir is None:
return set()
p = Path(shards_dir)
rivalry_rows = _load_rivalry_rows(p)
if not rivalry_rows:
return set()
qlower = {t.lower() for t in tokens}
# Build closures for both rivalry sides AND the question tokens,
# so the overlap test below is set-vs-set. Tokens we need:
# every token mentioned in rivalry rows + every question token.
# All loaded in one targeted query via _load_neighbors_for.
rivalry_tokens = {t for pair in rivalry_rows for t in pair}
manual_index, derived_index = _load_neighbors_for(
p, qlower | rivalry_tokens
)
excluded: set[str] = set()
for a, b in rivalry_rows:
ga = frozenset(
manual_index.get(a, set()) | derived_index.get(a, set()) | {a}
)
gb = frozenset(
manual_index.get(b, set()) | derived_index.get(b, set()) | {b}
)
a_in = bool(qlower & ga)
b_in = bool(qlower & gb)
if a_in and not b_in:
excluded |= gb
elif b_in and not a_in:
excluded |= ga
return excluded