Source code for arborist.concepts.query

"""Retrieval-time concept lookup. Cross-shard, read-only.

Public API matches the legacy ``arborist.qa.concepts`` shape so existing
call sites in ``query.py`` keep working unchanged. Behavior changes:

- Backed by the ``concept_relations`` SQLite table instead of in-Python
  frozensets.
- Walks every shard in ``shards_dir`` (same UNION pattern as cross-shard
  FTS5 search). Concept relations from shard 003 are visible to a query
  routed at shard 000 — exactly what we want for a 3.47M-doc corpus
  split across many shards.
- Token comparison is case-insensitive at the SQL layer (NOCASE on
  LOWER()). Stored capitalization is preserved.

Cache: a per-process LRU keyed on ``shards_dir`` mtime. Lookups in a
hot loop don't re-walk shards. Cache invalidates when any shard file's
mtime changes (e.g. after `arborist concepts derive` writes new rows).
"""

from __future__ import annotations

import re
import time
from pathlib import Path

from arborist.store import connect_query

# Tokens that mean "user wants both sides of any rivalry shown" —
# kept here (not in DB) because compare-phrasing detection is a
# query-classification task, not a corpus-derived signal.
COMPARE_WORDS: frozenset[str] = frozenset({
    "vs", "versus", "compare", "compared", "comparison", "compares",
    "between", "difference", "differences", "or", "either",
})


[docs] def has_compare_phrasing(question: str) -> bool: """True if the question contains comparison language.""" lower = (question or "").lower() words = set(re.findall(r"[a-z]+", lower)) return bool(words & COMPARE_WORDS)
# --------------------------------------------------------------------------- # Cross-shard lookup with mtime-keyed cache # --------------------------------------------------------------------------- # Eager-load cache (legacy) — kept for back-compat callers that still # go through `_get_indices`. The hot path (synonym_expand, # rivalry_excluded) uses the lazy caches below. # # Cache shape: { shards_dir_str: (mtime_sig, manual_index, derived_index, rivalry_pairs, token_idf) } # - manual_index: curated synonym edges; always expanded # - derived_index: corpus-derived synonym edges; expansion subject to per-token cap # - rivalry_pairs: list of (set_a, set_b) — frozensets of lowercase tokens # - token_idf: { token_lower: doc_freq } summed across shards, used for cap-time ranking _CACHE: dict[str, tuple[tuple, dict, dict, list, dict]] = {} # Lazy per-token neighbor cache (#000026 follow-up). The eager loader # pulled all 290K concept_relations rows on first call (~2.4 s) so a # single CLI query paid that cost up front; benchmarks amortize it. # We don't actually need the full graph — `synonym_expand` only ever # touches direct neighbors of the question's tokens. For a 5-token # question that's <300 rows, queryable in ~300 ms total via # WHERE token IN (...) OR target IN (...). # # Shape: { shards_dir_str: { "sig": mtime_signature, "manual": {token: {neighbors}}, # "derived": {token: {neighbors}} } } # A token is "known" once we've queried for it; the empty-set claim # means "we asked SQL and got nothing back" — distinguishes from # "we haven't asked yet." _NEIGHBOR_CACHE: dict[str, dict] = {} # Process-wide rivalry-row cache. The corpus has ~2 rivalry rows # total today, so this is essentially free; cache lets repeated calls # skip the round-trip entirely. # Shape: { shards_dir_str: (mtime_sig, [(token, target), ...]) } _RIVALRY_ROWS_CACHE: dict[str, tuple[tuple, list[tuple[str, str]]]] = {} def _shards_mtime_signature(shards_dir: Path) -> tuple: """Return a tuple of (path, mtime_ns) for every *.db in shards_dir. Stable across runs as long as no shard's mtime changes.""" if not shards_dir.is_dir(): return () return tuple( (str(p.resolve()), p.stat().st_mtime_ns) for p in sorted(shards_dir.glob("*.db")) ) def _load_token_idf(shards_dir: Path) -> dict[str, int]: """Sum per-token doc_freq across all shards. The concept_token_idf table is per-shard (the `chunks_fts` index it derives from is per-shard), so cross-shard ranking aggregates via SUM over the UNION view exposed by connect_query. Tokens not in any shard's table get NO entry; callers default to "treat as rare" via dict.get() with a high-rarity default. """ conn = connect_query(shards_dir=shards_dir) rows = conn.execute( "SELECT token, SUM(doc_freq) AS df FROM concept_token_idf GROUP BY token" ).fetchall() conn.close() return {(r["token"] or "").lower(): int(r["df"] or 0) for r in rows} def _load_indices(shards_dir: Path) -> tuple[dict, dict, list]: """Walk all shards, materialize the synonym & rivalry indices. Synonyms map every token to its **direct neighbors only**, NOT the transitive closure across reciprocal-link chains. Why: union-find over the full Wikipedia reciprocal-link graph collapses everything into one giant connected component (54k+ tokens for any seed in a 4-shard corpus). Direct-neighbor expansion preserves the legacy frozenset semantics (every member of a group expanded to the others in that group, but the groups didn't chain). Two synonym indices are built, one per evidence class: - ``manual_index`` — manual_legacy + manual rows. Curated; always expanded regardless of per-token degree. Captures the brain-tech / AMD-family / etc. seed groups whose anchor token has many deliberate members (e.g. telepathy → 29 members of the brain-tech group). - ``derived_index`` — link_reciprocity & other corpus-derived extractors. Subject to per-token degree cap because the Wikipedia link graph carries topic-adjacency noise on generic tokens (person, thoughts, language). Rivalry pairs use the union of both indices for closure. """ conn = connect_query(shards_dir=shards_dir) rows = conn.execute( "SELECT relation_kind, evidence_kind, token, target FROM concept_relations" ).fetchall() conn.close() manual_index: dict[str, set[str]] = {} derived_index: dict[str, set[str]] = {} rivalry_rows: list[tuple[str, str]] = [] # Curated evidence kinds — never capped at expansion time. MANUAL_KINDS = {"manual", "manual_legacy"} for r in rows: kind = r["relation_kind"] evidence_kind = r["evidence_kind"] a = (r["token"] or "").lower() b = (r["target"] or "").lower() if not a or not b or a == b: continue if kind == "synonym": target_index = ( manual_index if evidence_kind in MANUAL_KINDS else derived_index ) target_index.setdefault(a, set()).add(b) target_index.setdefault(b, set()).add(a) elif kind == "rivalry": rivalry_rows.append((a, b)) # Rivalry pairs use union of manual + derived neighborhoods. rivalry_pairs: list[tuple[frozenset[str], frozenset[str]]] = [] for a, b in rivalry_rows: ga = frozenset( manual_index.get(a, set()) | derived_index.get(a, set()) | {a} ) gb = frozenset( manual_index.get(b, set()) | derived_index.get(b, set()) | {b} ) rivalry_pairs.append((ga, gb)) return manual_index, derived_index, rivalry_pairs def _get_indices(shards_dir: Path | str | None) -> tuple[dict, dict, list, dict]: """Return (manual_index, derived_index, rivalry_pairs, token_idf), using cached values when the shard mtime signature is unchanged.""" if shards_dir is None: return {}, {}, [], {} p = Path(shards_dir) key = str(p.resolve()) sig = _shards_mtime_signature(p) cached = _CACHE.get(key) if cached is not None and cached[0] == sig: return cached[1], cached[2], cached[3], cached[4] manual, derived, riv = _load_indices(p) idf = _load_token_idf(p) _CACHE[key] = (sig, manual, derived, riv, idf) return manual, derived, riv, idf
[docs] def invalidate_cache() -> None: """Drop all cached indices. Call after a writer commits new rows (the mtime check would catch this on next read, but invalidating explicitly is faster on the same-process write+read pattern).""" _CACHE.clear() _NEIGHBOR_CACHE.clear() _RIVALRY_ROWS_CACHE.clear()
# --------------------------------------------------------------------------- # Lazy neighbor loading — the hot path for synonym_expand / rivalry_excluded # --------------------------------------------------------------------------- def _ensure_neighbor_cache(shards_dir: Path) -> dict: """Return the live neighbor cache dict for `shards_dir`. Drops & re-initializes if any shard's mtime has changed since last load.""" sig = _shards_mtime_signature(shards_dir) key = str(shards_dir.resolve()) cached = _NEIGHBOR_CACHE.get(key) if cached is None or cached["sig"] != sig: _NEIGHBOR_CACHE[key] = {"sig": sig, "manual": {}, "derived": {}} return _NEIGHBOR_CACHE[key] def _load_neighbors_for( shards_dir: Path, tokens: set[str] ) -> tuple[dict[str, set[str]], dict[str, set[str]]]: """Lazy targeted loader. Returns (manual_subset, derived_subset) mapping just the requested tokens to their direct neighbors. Hits SQL only for tokens we haven't yet asked about in this process. Once a token has been queried (even returning zero rows), we cache the empty set so subsequent calls don't re-query. """ cache = _ensure_neighbor_cache(shards_dir) qlower = {t.lower() for t in tokens if t} missing = sorted(t for t in qlower if t not in cache["manual"] and t not in cache["derived"]) if missing: ph = ",".join("?" * len(missing)) params = missing + missing conn = connect_query(shards_dir=shards_dir) try: rows = conn.execute( f"SELECT relation_kind, evidence_kind, token, target " f"FROM concept_relations " f"WHERE relation_kind = 'synonym' " f"AND (token IN ({ph}) OR target IN ({ph}))", params, ).fetchall() finally: conn.close() for r in rows: evidence_kind = r["evidence_kind"] a = (r["token"] or "").lower() b = (r["target"] or "").lower() if not a or not b or a == b: continue target_index = ( cache["manual"] if evidence_kind in {"manual", "manual_legacy"} else cache["derived"] ) target_index.setdefault(a, set()).add(b) target_index.setdefault(b, set()).add(a) # Mark every queried token as "we've asked" so we don't # re-query empty results. Either index is fine — synonym_expand # checks both with .get(t, set()). for t in missing: cache["manual"].setdefault(t, set()) cache["derived"].setdefault(t, set()) return ( {t: cache["manual"].get(t, set()) for t in qlower}, {t: cache["derived"].get(t, set()) for t in qlower}, ) def _load_rivalry_rows(shards_dir: Path) -> list[tuple[str, str]]: """Process-wide cache of rivalry-relation rows for one shards_dir. Refresh on mtime change. The corpus has ~2 rivalry rows total today; this is essentially zero-cost after the first call.""" sig = _shards_mtime_signature(shards_dir) key = str(shards_dir.resolve()) cached = _RIVALRY_ROWS_CACHE.get(key) if cached is not None and cached[0] == sig: return cached[1] conn = connect_query(shards_dir=shards_dir) try: rows = conn.execute( "SELECT token, target FROM concept_relations " "WHERE relation_kind = 'rivalry'" ).fetchall() finally: conn.close() pairs = [] for r in rows: a = (r["token"] or "").lower() b = (r["target"] or "").lower() if a and b and a != b: pairs.append((a, b)) _RIVALRY_ROWS_CACHE[key] = (sig, pairs) return pairs def _load_idf_for(shards_dir: Path, tokens: set[str]) -> dict[str, int]: """Lazy IDF loader — only fetched when expansion exceeds ``max_total`` and we need to rank for truncation. Avoids the 66 K-row dump on the common case (small expansions stay under the cap).""" if not tokens: return {} qlower = {t.lower() for t in tokens if t} if not qlower: return {} ph = ",".join("?" * len(qlower)) conn = connect_query(shards_dir=shards_dir) try: rows = conn.execute( f"SELECT token, SUM(doc_freq) AS df FROM concept_token_idf " f"WHERE token IN ({ph}) GROUP BY token", list(qlower), ).fetchall() finally: conn.close() return {(r["token"] or "").lower(): int(r["df"] or 0) for r in rows} def _load_neighbor_source_freq( shards_dir: Path, token: str, neighbors: set[str] ) -> dict[str, int]: """For one ``token`` whose derived-synonym degree exceeds the per-token cap, return ``{neighbor: source_root_count}`` — how many distinct documents asserted the (token, neighbor) synonym relation. Used to rank-and-truncate over-cap derived expansions instead of hard-skipping a homonym acronym entirely (#000054 Phase 2a — CPU has 13 legitimate homonym expansions across the corpus, of which `central / processing / unit` are anchored by hundreds of documents and the noise tail by 1-2 each). The extractor emits bidirectional edges, so for any (a, b) pair both rows exist with token=a/target=b and token=b/target=a, each anchored to the doc that defined them. Counting on one direction captures the asymmetric assertion. #000054.""" if not neighbors: return {} nlower = {n.lower() for n in neighbors if n} if not nlower: return {} ph = ",".join("?" * len(nlower)) conn = connect_query(shards_dir=shards_dir) try: rows = conn.execute( f"SELECT target, COUNT(DISTINCT source_root) AS cnt " f"FROM concept_relations " f"WHERE relation_kind = 'synonym' " f" AND token = ? AND target IN ({ph}) " f"GROUP BY target", [token.lower()] + list(nlower), ).fetchall() finally: conn.close() return {(r["target"] or "").lower(): int(r["cnt"] or 0) for r in rows} # --------------------------------------------------------------------------- # Public API — matches the legacy ``arborist.qa.concepts`` shape # --------------------------------------------------------------------------- # Caps that match the legacy (frozenset) expansion size. The corpus- # derived synonym graph is noisier than hand-curated frozensets: # generic tokens like "person" / "thoughts" / "language" have ~20+ # reciprocal-link neighbors each, most of which are topic-adjacency # noise rather than actual synonyms. Without these caps a 19-token # query expands to ~400 accept tokens, and the title-LIKE search # multiplies that by the document count to a many-minute hang. # # MAX_NEIGHBORS_PER_TOKEN: any token with more than this many # direct neighbors is treated as "too generic to expand" — we add # only the token itself, not its neighbors. Mirrors how the legacy # frozensets covered named entities (athlon, pentium) but not common # words (person, thoughts). # # MAX_TOTAL_TOKENS: overall cap on the expanded set. Original query # tokens are always preserved; once total exceeds the cap, neighbors # are sorted alphabetically & truncated. Bound on title-LIKE clause # count keeps the SQL tractable on a 3.47M-doc corpus. MAX_NEIGHBORS_PER_TOKEN = 8 MAX_TOTAL_TOKENS = 50 def _load_strict_neighbors_for( shards_dir: Path, tokens: set[str] ) -> dict[str, set[str]]: """High-trust synonym neighbors only — manual + manual_legacy + acronym_parens. Excludes ``link_reciprocity`` because reciprocal wikilinks express topical adjacency, not synonymy (a ``Dinosaurs`` page reciprocally links to a ``Curious George Brigade`` page → an edge that should not amplify retrieval). The multiplicative title-purity rerank uses this strict view; the additive title-rerank + retrieval routes use the broader ``synonym_expand``. #000054 Phase 2b.""" qlower = {t.lower() for t in tokens if t} if not qlower: return {} ph = ",".join("?" * len(qlower)) params = list(qlower) + list(qlower) conn = connect_query(shards_dir=shards_dir) try: rows = conn.execute( f"SELECT token, target FROM concept_relations " f"WHERE relation_kind = 'synonym' " f" AND evidence_kind IN ('manual', 'manual_legacy', 'acronym_parens') " f" AND (token IN ({ph}) OR target IN ({ph}))", params, ).fetchall() finally: conn.close() out: dict[str, set[str]] = {t: set() for t in qlower} for r in rows: a = (r["token"] or "").lower() b = (r["target"] or "").lower() if not a or not b or a == b: continue if a in out: out[a].add(b) if b in out: out[b].add(a) return out def synonym_expand_strict( tokens: set[str], *, shards_dir: Path | str | None = None, max_neighbors_per_token: int = MAX_NEIGHBORS_PER_TOKEN, ) -> set[str]: """Like ``synonym_expand`` but restricted to high-trust evidence kinds (manual / acronym_parens) — see ``_load_strict_neighbors_for``. Used by the title-purity multiplier; the broader ``synonym_expand`` is right for retrieval (additive boosts) but its inclusion of noisy ``link_reciprocity`` edges blows up under a multiplicative rank. #000054 Phase 2b.""" if not tokens or shards_dir is None: return set(t.lower() for t in tokens if t) p = Path(shards_dir) qlower = {t.lower() for t in tokens if t} neighbors_by_token = _load_strict_neighbors_for(p, qlower) expanded: set[str] = set(qlower) for t in qlower: nb = neighbors_by_token.get(t, set()) if not nb: continue if len(nb) <= max_neighbors_per_token: expanded |= nb continue # Frequency rank — same discipline as the broad path. freq = _load_neighbor_source_freq(p, t, nb) ranked = sorted(nb, key=lambda n: (-freq.get(n, 0), n)) expanded |= set(ranked[:max_neighbors_per_token]) return expanded
[docs] def synonym_expand( tokens: set[str], *, shards_dir: Path | str | None = None, max_neighbors_per_token: int = MAX_NEIGHBORS_PER_TOKEN, max_total: int = MAX_TOTAL_TOKENS, ) -> set[str]: """Add direct synonym neighbors for any input token whose degree is bounded enough that its neighbors are likely topical, not topic-adjacency noise. Two caps protect retrieval performance & quality: 1. ``max_neighbors_per_token`` — tokens with more direct neighbors than this contribute NO expansion. Generic tokens ("person", "thoughts") have huge degree in the Wikipedia reciprocal-link graph; expanding them dumps random topical-cluster noise. Specific named entities ("athlon", "telepathy") have small focused neighborhoods that pass the cap. 2. ``max_total`` — overall cap on expanded set size. Bounds the SQL clause count downstream. Original query tokens are always preserved; if total > cap, neighbors are sorted alphabetically & truncated. """ if not tokens: return set() if shards_dir is None: return set(tokens) p = Path(shards_dir) qlower = {t.lower() for t in tokens} # Lazy load — only the rows whose token or target appears in the # question. Drops first-call cost from ~2.4 s (290 K-row scan + # Python iteration) to ~0.3 s (targeted WHERE-IN query, ~300 rows). manual_index, derived_index = _load_neighbors_for(p, qlower) expanded: set[str] = set(qlower) # Manual (curated) synonyms always expand: brain-tech / AMD-family / # etc. seed groups have legitimately many members per anchor & we # trust the curation. for t in qlower: expanded |= manual_index.get(t, set()) # Derived (corpus-extracted) synonyms cap on per-token degree. # Generic tokens like "person" / "thoughts" / "language" have wide # noisy neighborhoods in the reciprocal-link graph; specific tokens # have bounded ones. When the per-token degree exceeds the cap we # *rank-and-truncate* by source-frequency rather than hard-skipping # (#000054 Phase 2a) — a homonym acronym ("CPU" has 13 expansions # across the corpus: central/processing/unit anchored by hundreds # of docs, canadian/contract/pharmaceutical by 1-2 each) should # still contribute its dominant expansion, not nothing. The # frequency rank surfaces the canonical pairing and drops the # noise tail. Below-cap tokens stay on the fast path (no SQL). for t in qlower: neighbors = derived_index.get(t, set()) if not neighbors: continue if len(neighbors) <= max_neighbors_per_token: expanded |= neighbors continue # Over the per-token cap: rank by source-frequency (desc), # keep the top ``max_neighbors_per_token``. freq = _load_neighbor_source_freq(p, t, neighbors) ranked = sorted(neighbors, key=lambda n: (-freq.get(n, 0), n)) expanded |= set(ranked[:max_neighbors_per_token]) if len(expanded) > max_total: # IDF-rank the neighbors (rarer = more topical = keep first). # Tokens absent from concept_token_idf get a sentinel high- # rarity score so a hapax doesn't lose to a known-common # token at the truncation boundary. When the IDF table is # empty (backfill not run yet), ranking degenerates to # alphabetical (fallback compatibility). neighbors_only = expanded - qlower # Lazy IDF — only fetched when we hit the cap. Most queries # never reach this branch. token_idf = _load_idf_for(p, neighbors_only) SENTINEL_HIGH_RARITY = 0 ranked = sorted( neighbors_only, key=lambda t: (token_idf.get(t, SENTINEL_HIGH_RARITY), t), ) budget = max(0, max_total - len(qlower)) expanded = qlower | set(ranked[:budget]) return expanded
[docs] def rivalry_excluded( tokens: set[str], *, shards_dir: Path | str | None = None, compare_phrasing: bool = False, ) -> set[str]: """Tokens whose presence in a doc title means EXCLUDE that doc. For each rivalry pair (A, B): if exactly ONE side appears in the query AND no comparison language was used, exclude the OTHER side's tokens. If both sides appear, or if the user asked for a comparison, no exclusion (they wanted both). """ if compare_phrasing or not tokens or shards_dir is None: return set() p = Path(shards_dir) rivalry_rows = _load_rivalry_rows(p) if not rivalry_rows: return set() qlower = {t.lower() for t in tokens} # Build closures for both rivalry sides AND the question tokens, # so the overlap test below is set-vs-set. Tokens we need: # every token mentioned in rivalry rows + every question token. # All loaded in one targeted query via _load_neighbors_for. rivalry_tokens = {t for pair in rivalry_rows for t in pair} manual_index, derived_index = _load_neighbors_for( p, qlower | rivalry_tokens ) excluded: set[str] = set() for a, b in rivalry_rows: ga = frozenset( manual_index.get(a, set()) | derived_index.get(a, set()) | {a} ) gb = frozenset( manual_index.get(b, set()) | derived_index.get(b, set()) | {b} ) a_in = bool(qlower & ga) b_in = bool(qlower & gb) if a_in and not b_in: excluded |= gb elif b_in and not a_in: excluded |= ga return excluded