Source code for arborist.evict

"""Reversible eviction + rehydrate.

Implements the systematic-forgetting mechanic from the design philosophy:

- evict_to_cold: surface chunks demote from `hot` to `cold`; content set to
  NULL, FTS5 row deleted. leaf_hash retained — identity preserved.
- rehydrate: refetch URI through the same source pipeline, re-chunk with the
  original chunking_version, compare leaves and root. Match -> content
  restored, tier hot. Mismatch -> drift event in audit chain, providence
  records flipped to falsification_state='stale'. No content restored.

Cores never evict.
"""

from __future__ import annotations

import sqlite3
import time
from typing import Callable, Iterable

from arborist.compress import pack_chunk
from arborist.document import canonicalize, get_chunker
from arborist.merkle import MerkleTree, hash_leaf
from arborist.store import append_audit, transaction


# Re-fetcher signature: takes a URI, returns parsed/canonicalized text or None.
Fetcher = Callable[[str], str | None]


def _default_html_fetcher(uri: str) -> str | None:
    """Reuse HtmlPageSource so rehydrate runs the exact same pipeline as ingest."""
    try:
        from arborist.sources.html_page import HtmlPageSource
    except ImportError:
        return None
    src = HtmlPageSource([uri])
    for doc in src.iter_documents():
        return doc.content
    return None


_FETCHERS: dict[str, Fetcher] = {
    "html": _default_html_fetcher,
}


[docs] def evict_to_cold( conn: sqlite3.Connection, *, source_type: str | None = None, older_than_days: int | None = None, document_roots: Iterable[str] | None = None, ) -> dict: """Demote matching surface chunks from hot to cold. Cores are never evicted. Content is NULLed; FTS row removed. """ where = ["d.kind = 'surface'", "c.tier = 'hot'"] params: list = [] if source_type: where.append("d.source_type = ?") params.append(source_type) if older_than_days is not None: cutoff = int(time.time()) - older_than_days * 86400 where.append("d.ingest_ts < ?") params.append(cutoff) if document_roots is not None: roots = list(document_roots) if not roots: return {"evicted_chunks": 0, "documents_affected": 0} placeholders = ",".join("?" for _ in roots) where.append(f"c.document_root IN ({placeholders})") params.extend(roots) sql = ( "SELECT c.document_root, c.idx FROM chunks c " "JOIN documents d ON d.document_root = c.document_root " "WHERE " + " AND ".join(where) ) candidates = conn.execute(sql, params).fetchall() if not candidates: return {"evicted_chunks": 0, "documents_affected": 0} per_doc: dict[str, int] = {} with transaction(conn): for r in candidates: # Delete from FTS5 first so we can resolve chunk_id via the same # row before its content goes away. Contentless FTS5 deletions # are addressed by rowid (== chunks.chunk_id). chunk_id_row = conn.execute( "SELECT chunk_id FROM chunks WHERE document_root=? AND idx=?", (r["document_root"], r["idx"]), ).fetchone() if chunk_id_row is not None: conn.execute( "DELETE FROM chunks_fts WHERE rowid=?", (chunk_id_row["chunk_id"],), ) conn.execute( "UPDATE chunks SET content=NULL, tier='cold' " "WHERE document_root=? AND idx=?", (r["document_root"], r["idx"]), ) per_doc[r["document_root"]] = per_doc.get(r["document_root"], 0) + 1 for doc_root, n in per_doc.items(): append_audit( conn, event_type="evict_cold", subject_root=doc_root, body={"chunks_evicted": n}, ) return { "evicted_chunks": len(candidates), "documents_affected": len(per_doc), }
[docs] def rehydrate( conn: sqlite3.Connection, document_root: str, *, fetcher: Fetcher | None = None, ) -> dict: """Refetch URI, verify leaves, restore content if and only if root matches. Returns a dict whose ``status`` is one of: ``unknown_document``, ``nothing_to_do``, ``source_not_rehydratable``, ``fetch_failed``, ``drift_detected``, or ``rehydrated``. """ doc_row = conn.execute( "SELECT document_uri, source_type, chunking_version " "FROM documents WHERE document_root = ?", (document_root,), ).fetchone() if doc_row is None: return {"status": "unknown_document"} cold_chunks = conn.execute( "SELECT idx, leaf_hash FROM chunks " "WHERE document_root = ? AND tier = 'cold' ORDER BY idx", (document_root,), ).fetchall() if not cold_chunks: return {"status": "nothing_to_do", "cold_chunks": 0} # Pick fetcher by source_type unless caller supplies one. use_fetcher = fetcher or _FETCHERS.get(doc_row["source_type"]) if use_fetcher is None: return { "status": "source_not_rehydratable", "source_type": doc_row["source_type"], } try: text = use_fetcher(doc_row["document_uri"]) except Exception as e: # noqa: BLE001 — surface any error in status return {"status": "fetch_failed", "error": repr(e)} if text is None: return {"status": "fetch_failed", "error": "fetcher returned None"} chunker = get_chunker(doc_row["chunking_version"]) new_text = canonicalize(text) new_chunk_strs = chunker.split(new_text) new_leaves = [hash_leaf(c.encode("utf-8")) for c in new_chunk_strs] new_root = MerkleTree.build(new_leaves).root.hex() if new_root != document_root: with transaction(conn): append_audit( conn, event_type="rehydrate_drift", subject_root=document_root, body={ "expected_root": document_root, "actual_root": new_root, "uri": doc_row["document_uri"], }, ) # v9.8 falsification: any cached providence record from this # source is now stale. conn.execute( "UPDATE providence_cache SET falsification_state = 'stale' " "WHERE source_root = ? AND falsification_state = 'live'", (document_root,), ) return { "status": "drift_detected", "expected_root": document_root, "actual_root": new_root, } # Roots match. Restore content for every cold chunk. restored = 0 with transaction(conn): for c in cold_chunks: i = c["idx"] if i >= len(new_chunk_strs): continue recomputed = hash_leaf(new_chunk_strs[i].encode("utf-8")).hex() if recomputed != c["leaf_hash"]: # Defensive: shouldn't happen if roots match, but bail safely. continue conn.execute( "UPDATE chunks SET content = ?, tier = 'hot' " "WHERE document_root = ? AND idx = ?", (pack_chunk(new_chunk_strs[i]), document_root, i), ) chunk_id_row = conn.execute( "SELECT chunk_id FROM chunks WHERE document_root = ? AND idx = ?", (document_root, i), ).fetchone() if chunk_id_row is not None: conn.execute( "INSERT INTO chunks_fts (rowid, content) VALUES (?, ?)", (chunk_id_row["chunk_id"], new_chunk_strs[i]), ) restored += 1 append_audit( conn, event_type="rehydrate_success", subject_root=document_root, body={"chunks_restored": restored, "uri": doc_row["document_uri"]}, ) return {"status": "rehydrated", "chunks_restored": restored}