"""Reversible eviction + rehydrate.
Implements the systematic-forgetting mechanic from the design philosophy:
- evict_to_cold: surface chunks demote from `hot` to `cold`; content set to
NULL, FTS5 row deleted. leaf_hash retained — identity preserved.
- rehydrate: refetch URI through the same source pipeline, re-chunk with the
original chunking_version, compare leaves and root. Match -> content
restored, tier hot. Mismatch -> drift event in audit chain, providence
records flipped to falsification_state='stale'. No content restored.
Cores never evict.
"""
from __future__ import annotations
import sqlite3
import time
from typing import Callable, Iterable
from arborist.compress import pack_chunk
from arborist.document import canonicalize, get_chunker
from arborist.merkle import MerkleTree, hash_leaf
from arborist.store import append_audit, transaction
# Re-fetcher signature: takes a URI, returns parsed/canonicalized text or None.
Fetcher = Callable[[str], str | None]
def _default_html_fetcher(uri: str) -> str | None:
"""Reuse HtmlPageSource so rehydrate runs the exact same pipeline as ingest."""
try:
from arborist.sources.html_page import HtmlPageSource
except ImportError:
return None
src = HtmlPageSource([uri])
for doc in src.iter_documents():
return doc.content
return None
_FETCHERS: dict[str, Fetcher] = {
"html": _default_html_fetcher,
}
[docs]
def evict_to_cold(
conn: sqlite3.Connection,
*,
source_type: str | None = None,
older_than_days: int | None = None,
document_roots: Iterable[str] | None = None,
) -> dict:
"""Demote matching surface chunks from hot to cold.
Cores are never evicted. Content is NULLed; FTS row removed.
"""
where = ["d.kind = 'surface'", "c.tier = 'hot'"]
params: list = []
if source_type:
where.append("d.source_type = ?")
params.append(source_type)
if older_than_days is not None:
cutoff = int(time.time()) - older_than_days * 86400
where.append("d.ingest_ts < ?")
params.append(cutoff)
if document_roots is not None:
roots = list(document_roots)
if not roots:
return {"evicted_chunks": 0, "documents_affected": 0}
placeholders = ",".join("?" for _ in roots)
where.append(f"c.document_root IN ({placeholders})")
params.extend(roots)
sql = (
"SELECT c.document_root, c.idx FROM chunks c "
"JOIN documents d ON d.document_root = c.document_root "
"WHERE " + " AND ".join(where)
)
candidates = conn.execute(sql, params).fetchall()
if not candidates:
return {"evicted_chunks": 0, "documents_affected": 0}
per_doc: dict[str, int] = {}
with transaction(conn):
for r in candidates:
# Delete from FTS5 first so we can resolve chunk_id via the same
# row before its content goes away. Contentless FTS5 deletions
# are addressed by rowid (== chunks.chunk_id).
chunk_id_row = conn.execute(
"SELECT chunk_id FROM chunks WHERE document_root=? AND idx=?",
(r["document_root"], r["idx"]),
).fetchone()
if chunk_id_row is not None:
conn.execute(
"DELETE FROM chunks_fts WHERE rowid=?",
(chunk_id_row["chunk_id"],),
)
conn.execute(
"UPDATE chunks SET content=NULL, tier='cold' "
"WHERE document_root=? AND idx=?",
(r["document_root"], r["idx"]),
)
per_doc[r["document_root"]] = per_doc.get(r["document_root"], 0) + 1
for doc_root, n in per_doc.items():
append_audit(
conn,
event_type="evict_cold",
subject_root=doc_root,
body={"chunks_evicted": n},
)
return {
"evicted_chunks": len(candidates),
"documents_affected": len(per_doc),
}
[docs]
def rehydrate(
conn: sqlite3.Connection,
document_root: str,
*,
fetcher: Fetcher | None = None,
) -> dict:
"""Refetch URI, verify leaves, restore content if and only if root matches.
Returns a dict whose ``status`` is one of: ``unknown_document``,
``nothing_to_do``, ``source_not_rehydratable``, ``fetch_failed``,
``drift_detected``, or ``rehydrated``.
"""
doc_row = conn.execute(
"SELECT document_uri, source_type, chunking_version "
"FROM documents WHERE document_root = ?",
(document_root,),
).fetchone()
if doc_row is None:
return {"status": "unknown_document"}
cold_chunks = conn.execute(
"SELECT idx, leaf_hash FROM chunks "
"WHERE document_root = ? AND tier = 'cold' ORDER BY idx",
(document_root,),
).fetchall()
if not cold_chunks:
return {"status": "nothing_to_do", "cold_chunks": 0}
# Pick fetcher by source_type unless caller supplies one.
use_fetcher = fetcher or _FETCHERS.get(doc_row["source_type"])
if use_fetcher is None:
return {
"status": "source_not_rehydratable",
"source_type": doc_row["source_type"],
}
try:
text = use_fetcher(doc_row["document_uri"])
except Exception as e: # noqa: BLE001 — surface any error in status
return {"status": "fetch_failed", "error": repr(e)}
if text is None:
return {"status": "fetch_failed", "error": "fetcher returned None"}
chunker = get_chunker(doc_row["chunking_version"])
new_text = canonicalize(text)
new_chunk_strs = chunker.split(new_text)
new_leaves = [hash_leaf(c.encode("utf-8")) for c in new_chunk_strs]
new_root = MerkleTree.build(new_leaves).root.hex()
if new_root != document_root:
with transaction(conn):
append_audit(
conn,
event_type="rehydrate_drift",
subject_root=document_root,
body={
"expected_root": document_root,
"actual_root": new_root,
"uri": doc_row["document_uri"],
},
)
# v9.8 falsification: any cached providence record from this
# source is now stale.
conn.execute(
"UPDATE providence_cache SET falsification_state = 'stale' "
"WHERE source_root = ? AND falsification_state = 'live'",
(document_root,),
)
return {
"status": "drift_detected",
"expected_root": document_root,
"actual_root": new_root,
}
# Roots match. Restore content for every cold chunk.
restored = 0
with transaction(conn):
for c in cold_chunks:
i = c["idx"]
if i >= len(new_chunk_strs):
continue
recomputed = hash_leaf(new_chunk_strs[i].encode("utf-8")).hex()
if recomputed != c["leaf_hash"]:
# Defensive: shouldn't happen if roots match, but bail safely.
continue
conn.execute(
"UPDATE chunks SET content = ?, tier = 'hot' "
"WHERE document_root = ? AND idx = ?",
(pack_chunk(new_chunk_strs[i]), document_root, i),
)
chunk_id_row = conn.execute(
"SELECT chunk_id FROM chunks WHERE document_root = ? AND idx = ?",
(document_root, i),
).fetchone()
if chunk_id_row is not None:
conn.execute(
"INSERT INTO chunks_fts (rowid, content) VALUES (?, ?)",
(chunk_id_row["chunk_id"], new_chunk_strs[i]),
)
restored += 1
append_audit(
conn,
event_type="rehydrate_success",
subject_root=document_root,
body={"chunks_restored": restored, "uri": doc_row["document_uri"]},
)
return {"status": "rehydrated", "chunks_restored": restored}