Source code for arborist.concepts.store

"""DB read/write helpers for concept_relations.

All operations are scoped to a single shard connection. Cross-shard
queries live in ``arborist.concepts.query``.

Append-only by design: ``add_concept_relation`` uses INSERT OR IGNORE
on the UNIQUE (source_root, relation_kind, token, target, evidence_kind)
key, so re-derivation never duplicates rows. ``purge_by_evidence_kind``
is the only DELETE path & lets an operator revoke one extractor's
output without touching manual or other-extractor rows.
"""

from __future__ import annotations

import sqlite3
import time

# Allowed relation_kind values. The schema's CHECK constraint enforces
# this too — keeping the Python-side tuple in sync makes API misuse
# fail loud at the helper level rather than as a SQLite error.
RELATION_KINDS = ("synonym", "antonym", "rivalry", "category")


[docs] def add_concept_relation( conn: sqlite3.Connection, *, source_root: str, relation_kind: str, token: str, target: str, evidence_kind: str, confidence: float = 1.0, derived_from: str | None = None, derived_at: int | None = None, ) -> bool: """Append a concept relation. Returns True if a row was inserted, False if the (source_root, relation_kind, token, target, evidence_kind) tuple already existed (idempotent re-derivation). Tokens are stored exactly as given — case preservation lets the query layer decide normalization. Substring lookup at retrieval time is case-insensitive via SQLite's NOCASE comparator. """ if relation_kind not in RELATION_KINDS: raise ValueError( f"relation_kind must be one of {RELATION_KINDS}, got {relation_kind!r}" ) if not token or not target: raise ValueError("token and target must be non-empty") if derived_at is None: derived_at = int(time.time()) cursor = conn.execute( "INSERT OR IGNORE INTO concept_relations " "(source_root, relation_kind, token, target, evidence_kind, " " confidence, derived_at, derived_from) " "VALUES (?, ?, ?, ?, ?, ?, ?, ?)", ( source_root, relation_kind, token, target, evidence_kind, float(confidence), int(derived_at), derived_from, ), ) return cursor.rowcount > 0
[docs] def concept_relations_for_token( conn: sqlite3.Connection, token: str, *, relation_kind: str | None = None, ) -> list[dict]: """Return all relations whose ``token`` matches (case-insensitive). If ``relation_kind`` is given, filter to that kind.""" sql = ( "SELECT source_root, relation_kind, token, target, evidence_kind, " " confidence, derived_at, derived_from " "FROM concept_relations WHERE LOWER(token) = LOWER(?)" ) params: tuple = (token,) if relation_kind: if relation_kind not in RELATION_KINDS: raise ValueError( f"relation_kind must be one of {RELATION_KINDS}, got {relation_kind!r}" ) sql += " AND relation_kind = ?" params = params + (relation_kind,) return [dict(row) for row in conn.execute(sql, params).fetchall()]
[docs] def purge_by_evidence_kind( conn: sqlite3.Connection, evidence_kind: str, *, derived_from: str | None = None, ) -> int: """Delete every row with the given ``evidence_kind`` (and optional ``derived_from``). Returns the number of rows removed. The intended use: revoke a buggy extractor's output cleanly. Manual rows live under ``evidence_kind='manual'`` and are NOT touched by a purge of any other kind. """ sql = "DELETE FROM concept_relations WHERE evidence_kind = ?" params: tuple = (evidence_kind,) if derived_from is not None: sql += " AND derived_from = ?" params = params + (derived_from,) cursor = conn.execute(sql, params) return cursor.rowcount
def list_evidence_kinds(conn: sqlite3.Connection) -> list[tuple[str, int]]: """Return ``[(evidence_kind, row_count), ...]`` for the shard, ordered by row_count descending. Useful for ``arborist concepts list --kinds``.""" rows = conn.execute( "SELECT evidence_kind, COUNT(*) AS n " "FROM concept_relations GROUP BY evidence_kind ORDER BY n DESC" ).fetchall() return [(r["evidence_kind"], r["n"]) for r in rows]