"""SQLite-backed v9.8 store.
Schema implements the Merkle-AGI v9.8 admissibility ledger:
- 8-dim providence_cache key (source_root, question_hash,
model_profile_hash, conversation_hash, governance_policy_hash,
schema_version, canonicalization_version, chunking_version)
- falsification_state ∈ {live, failed, stale, quarantined}
- audit_events append-only chain (event_hash chains via prev_event_hash)
- documents.kind ∈ {surface, core} for layered compression
- chunks.tier ∈ {hot, warm, cold} for reversible eviction
- derivations table binds core docs back to source surface roots
The providence_cache layer is schema-only in Phase 0 — no Q&A inference yet.
"""
from __future__ import annotations
import json
import sqlite3
import time
from contextlib import contextmanager
from pathlib import Path
from typing import Iterator
DEFAULT_DB_PATH = Path.home() / ".arborist" / "arborist.db"
SCHEMA_SQL = """
PRAGMA journal_mode = WAL;
PRAGMA foreign_keys = ON;
CREATE TABLE IF NOT EXISTS schema_meta (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
-- Free-form per-DB metadata. Used by the resume mechanic to track each
-- source's high-water mark so a stopped ingest can rsync forward without
-- re-parsing rows that are already in this DB.
CREATE TABLE IF NOT EXISTS meta (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at INTEGER
);
-- Documents: surface (raw ingest) or core (distilled, Merkle-signed back).
CREATE TABLE IF NOT EXISTS documents (
document_root TEXT PRIMARY KEY, -- hex sha256 of merkle root
document_uri TEXT NOT NULL,
source_type TEXT NOT NULL,
kind TEXT NOT NULL DEFAULT 'surface'
CHECK (kind IN ('surface','core')),
compression_depth INTEGER NOT NULL DEFAULT 0,
title TEXT,
chunking_version TEXT NOT NULL,
canonicalization_version TEXT NOT NULL,
schema_version TEXT NOT NULL,
ingest_ts INTEGER NOT NULL,
hit_count INTEGER NOT NULL DEFAULT 0,
last_hit_at INTEGER
);
CREATE INDEX IF NOT EXISTS idx_documents_uri ON documents(document_uri);
CREATE INDEX IF NOT EXISTS idx_documents_kind ON documents(kind);
-- Per-document HTTP metadata for cheap recrawl-checks. ETag and
-- Last-Modified come from the response headers at ingest time and let a
-- recrawl pass send conditional HEAD requests (If-None-Match /
-- If-Modified-Since); a 304 lets us skip the body fetch entirely.
-- last_status / last_checked_at record the most recent recheck so
-- operators can audit "when was this URL last verified."
CREATE TABLE IF NOT EXISTS document_http_meta (
document_root TEXT PRIMARY KEY,
etag TEXT,
last_modified TEXT,
last_fetched_at INTEGER NOT NULL,
last_status INTEGER,
last_checked_at INTEGER,
FOREIGN KEY (document_root) REFERENCES documents(document_root) ON DELETE CASCADE
);
-- Chunks with tier-based reversible eviction.
-- content nullable: cold tier evicts content but retains leaf_hash + URI for
-- rehydration. Identity verified on rehydrate by recomputing leaf_hash.
--
-- chunk_id INTEGER PRIMARY KEY AUTOINCREMENT serves dual duty: it's both the
-- primary key and the rowid that the contentless FTS5 virtual table joins
-- against. The (document_root, idx) UNIQUE constraint preserves the prior
-- "one chunk per (doc, position)" invariant for callers that look up by it.
CREATE TABLE IF NOT EXISTS chunks (
chunk_id INTEGER PRIMARY KEY AUTOINCREMENT,
document_root TEXT NOT NULL,
idx INTEGER NOT NULL,
leaf_hash TEXT NOT NULL,
content TEXT,
tier TEXT NOT NULL DEFAULT 'hot'
CHECK (tier IN ('hot','warm','cold')),
UNIQUE (document_root, idx),
FOREIGN KEY (document_root) REFERENCES documents(document_root) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_chunks_leaf ON chunks(leaf_hash);
-- Interior Merkle nodes (layer >= 1). Layer 0 lives in chunks.leaf_hash.
CREATE TABLE IF NOT EXISTS merkle_nodes (
document_root TEXT NOT NULL,
layer INTEGER NOT NULL,
idx INTEGER NOT NULL,
hash TEXT NOT NULL,
PRIMARY KEY (document_root, layer, idx),
FOREIGN KEY (document_root) REFERENCES documents(document_root) ON DELETE CASCADE
);
-- Cross-links between documents (the forest).
-- Unresolved forward links (dst not yet ingested) carry dst_root='' and the
-- ingest pass backfills dst_root when the target appears.
--
-- WITHOUT ROWID: the PK covers every column, so a default rowid-based table
-- would near-duplicate the row data in the PK index. WITHOUT ROWID makes
-- the table itself a B-tree keyed on the PK and saves ~50% of edge storage
-- on real Wikipedia ingests (measured: 38 MB -> 21 MB / 1000 docs).
-- Behaviorally identical; only the on-disk layout changes.
CREATE TABLE IF NOT EXISTS edges (
src_root TEXT NOT NULL,
dst_root TEXT NOT NULL DEFAULT '', -- '' = unresolved, backfilled later
dst_uri TEXT NOT NULL DEFAULT '', -- always present so we can resolve later
edge_type TEXT NOT NULL, -- wikilink, citation, derived_from, ...
anchor TEXT NOT NULL DEFAULT '', -- chunk index or fragment, '' if N/A
PRIMARY KEY (src_root, edge_type, dst_root, dst_uri, anchor)
) WITHOUT ROWID;
CREATE INDEX IF NOT EXISTS idx_edges_dst_root ON edges(dst_root) WHERE dst_root <> '';
-- No idx_edges_dst_uri: the gravity_top_inbound analytical query in cli.py
-- counts inbound links per resolved destination *document* (dst_root), which
-- this partial index already serves with a streaming GROUP BY. (An earlier
-- formulation grouped by the raw dst_uri link string with no index and
-- hash-aggregated over every target -> unbounded RSS at corpus scale.)
-- Distillation: core_root <- src_root with Merkle-signed proof binding.
CREATE TABLE IF NOT EXISTS derivations (
core_root TEXT NOT NULL,
src_root TEXT NOT NULL,
proof_blob TEXT NOT NULL, -- JSON merkle proof
process_id TEXT NOT NULL, -- distillation process identifier
distilled_at INTEGER NOT NULL,
PRIMARY KEY (core_root, src_root, process_id),
FOREIGN KEY (core_root) REFERENCES documents(document_root) ON DELETE CASCADE,
FOREIGN KEY (src_root) REFERENCES documents(document_root) ON DELETE CASCADE
);
-- v9.8 providence cache: 8-dim admissibility key + falsification state.
-- Schema-only in Phase 0 (no Q&A runs yet); ready for Phase 1.
CREATE TABLE IF NOT EXISTS providence_cache (
cache_key TEXT PRIMARY KEY,
source_root TEXT NOT NULL,
document_uri TEXT NOT NULL,
question_hash TEXT NOT NULL,
question_text TEXT NOT NULL,
answer_text TEXT NOT NULL,
merkle_proof TEXT NOT NULL, -- JSON
model_profile_hash TEXT NOT NULL, -- model_id + revision + quantization
conversation_hash TEXT NOT NULL,
governance_policy_hash TEXT NOT NULL,
schema_version TEXT NOT NULL,
canonicalization_version TEXT NOT NULL,
chunking_version TEXT NOT NULL,
falsification_state TEXT NOT NULL DEFAULT 'live'
CHECK (falsification_state IN ('live','failed','stale','quarantined')),
chain TEXT NOT NULL DEFAULT 'private'
CHECK (chain IN ('private','public')),
audit_event_hash TEXT, -- latest audit event for this record
created_at INTEGER NOT NULL,
last_hit_at INTEGER,
hit_count INTEGER NOT NULL DEFAULT 0,
-- v9.8 audit_mode trichotomy (RAG-adapted vocabulary; substrate calls
-- UNGROUNDED "VISUAL"): STRICT (every quote in answer verified against
-- context), HYBRID (some claims verified, some emergent), UNGROUNDED
-- (no verbatim grounding — purely emergent from training).
-- Default UNGROUNDED: an unclassified record is the weakest claim.
audit_mode TEXT NOT NULL DEFAULT 'UNGROUNDED'
CHECK (audit_mode IN ('STRICT','HYBRID','UNGROUNDED','CANONICAL_PROJECTION')),
n_quotes INTEGER NOT NULL DEFAULT 0,
n_verified INTEGER NOT NULL DEFAULT 0,
-- JSON array of quoted spans the model produced but we couldn't find
-- verbatim in context. Mining these surfaces "what the model emerged
-- beyond the corpus" — candidate ingest targets.
unverified_quotes TEXT,
-- Which verifier strategy classified this record. 'quote' = model
-- followed the format and wrapped claims in double quotes. 'span' =
-- bullet/sentence-level substring match in context. 'entity' = no
-- spans matched but multi-word proper nouns did. 'paraphrase' =
-- token-coverage match (soft signal). 'claim_lattice' = quote-by-
-- pointer mode (model emitted JSON; verifier checked evidence_id
-- resolution + source_role + manual-quote prohibition).
-- 'canonical_projection' = answer produced by a deterministic π*
-- kernel (arithmetic@v1, logic-kernel@v1, …) — the canonical bytes
-- ARE the proof. 'none' = no evidence at all (truly emergent).
verifier_method TEXT NOT NULL DEFAULT 'none'
CHECK (verifier_method IN ('quote','span','entity','paraphrase','claim_lattice','canonical_projection','none')),
-- Per-run Merkle-DAG. run_dag_root = MerkleTree over ordered stage
-- hashes (question / retrieval / context / prompt / answer / verify /
-- final_label). run_dag_blob carries the full {root, nodes} JSON so
-- an auditor can recompute & verify. Distinct from audit_event_hash
-- (linear DB-wide chain). NULL on legacy records pre-2026-04-30.
run_dag_root TEXT,
run_dag_blob TEXT
);
CREATE INDEX IF NOT EXISTS idx_providence_root ON providence_cache(source_root);
CREATE INDEX IF NOT EXISTS idx_providence_state ON providence_cache(falsification_state);
-- idx_providence_audit lives in _migrate_audit_mode() so legacy shards
-- (where audit_mode column gets added by ALTER TABLE) don't trip this
-- script before the migration runs.
-- Append-only audit chain. event_hash = sha256(prev_event_hash || canonical(body)).
CREATE TABLE IF NOT EXISTS audit_events (
seq INTEGER PRIMARY KEY AUTOINCREMENT,
event_hash TEXT NOT NULL UNIQUE,
prev_event_hash TEXT, -- NULL for genesis
event_type TEXT NOT NULL, -- ingest|falsify|evict_warm|evict_cold|derive|rehydrate|...
subject_root TEXT, -- document_root or cache_key
body TEXT NOT NULL, -- canonical JSON
ts INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_audit_subject ON audit_events(subject_root);
-- Falsification log: which records were marked failed/stale/quarantined and why.
CREATE TABLE IF NOT EXISTS falsifications (
cache_key TEXT NOT NULL,
state TEXT NOT NULL,
reason TEXT,
by_actor TEXT,
at INTEGER NOT NULL,
audit_event_hash TEXT NOT NULL,
PRIMARY KEY (cache_key, at)
);
-- Snapshots: corpus-level Merkle root pinning a forest state at a point in
-- time. snapshot_root = MerkleTree.build([sorted document_roots]). Audit-
-- chain-linked so peers can verify a claimed snapshot was actually witnessed
-- by this instance. parent_snapshot lets snapshots chain (A -> B -> C) for
-- diff/replay. doc_count is informational; the root is the canonical id.
CREATE TABLE IF NOT EXISTS snapshots (
snapshot_root TEXT PRIMARY KEY,
taken_at INTEGER NOT NULL,
audit_event_hash TEXT NOT NULL,
doc_count INTEGER NOT NULL,
parent_snapshot TEXT,
reason TEXT
);
CREATE INDEX IF NOT EXISTS idx_snapshots_taken_at ON snapshots(taken_at);
-- Mesh layer tables. Off by default — populated only when the user runs
-- `arborist mesh init`. Never accessed by ingest / query / distill paths;
-- mesh state is opt-in plumbing for federated peers (see arborist.mesh).
CREATE TABLE IF NOT EXISTS mesh_identity (
id INTEGER PRIMARY KEY CHECK (id = 1), -- singleton
member_id TEXT NOT NULL UNIQUE,
sign_priv BLOB NOT NULL, -- ed25519 32B raw
sign_pub BLOB NOT NULL, -- ed25519 32B raw
dh_priv BLOB NOT NULL, -- x25519 32B raw
dh_pub BLOB NOT NULL, -- x25519 32B raw
group_name TEXT NOT NULL,
created_at INTEGER NOT NULL
);
-- Per-epoch roster. epoch 0 = group genesis (founder only). Each membership
-- mutation (join, kick, scheduled rotate) bumps the epoch_id by 1 and writes
-- a fresh row-set capturing the new roster.
CREATE TABLE IF NOT EXISTS mesh_roster (
epoch_id INTEGER NOT NULL,
member_id TEXT NOT NULL,
sign_pub BLOB NOT NULL,
dh_pub BLOB NOT NULL,
role TEXT NOT NULL DEFAULT 'member'
CHECK (role IN ('admin','member')),
PRIMARY KEY (epoch_id, member_id)
);
CREATE INDEX IF NOT EXISTS idx_mesh_roster_member ON mesh_roster(member_id);
-- Epoch lifecycle log. secret_envelope is JSON of the form
-- {"member_id": {"nonce_b64": "...", "ct_b64": "..."}, ...}
-- where each entry is the symmetric epoch secret AEAD-wrapped to that
-- member's X25519 pubkey via ECDH. Eviction happens by NOT including the
-- evicted member's entry in the next epoch's envelope.
CREATE TABLE IF NOT EXISTS mesh_epochs (
epoch_id INTEGER PRIMARY KEY,
started_at INTEGER NOT NULL,
started_event_hash TEXT NOT NULL,
secret_envelope TEXT NOT NULL,
reason TEXT
);
-- Per-peer audit-chain tracking. Each row records the most recent
-- event_hash a given peer has broadcast to us; we enforce that every
-- subsequent gossip envelope carries `prev_event_hash == last_event_hash`
-- of that peer. A mismatch is a fork — the gossip is rejected (409).
-- last_seq is the local count of accepted envelopes from that peer
-- (informational; the canonical chain identity is last_event_hash).
CREATE TABLE IF NOT EXISTS mesh_peer_chains (
peer_member_id TEXT PRIMARY KEY,
last_event_hash TEXT NOT NULL,
last_seq INTEGER NOT NULL,
last_seen_at INTEGER NOT NULL
);
-- FTS5 over chunk content for UNGROUNDED-mode keyword search.
--
-- Contentless mode (`content=''`): FTS5 stores ONLY the inverted index, no
-- copy of the indexed text. This eliminates the ~28 MB / 1000 docs that the
-- prior schema spent on chunks_fts_content (the stored copy was redundant
-- with chunks.content). The trade: snippet() / highlight() return empty
-- in contentless mode, so the FTS5 backend builds snippets in Python by
-- joining `chunks_fts.rowid = chunks.chunk_id`, decompressing chunks.content,
-- and locating query tokens.
--
-- Inserts use `INSERT INTO chunks_fts (rowid, content) VALUES (chunk_id, plain)`
-- — the rowid must equal the chunks.chunk_id of the underlying row so the
-- search-time join lines up.
CREATE VIRTUAL TABLE IF NOT EXISTS chunks_fts USING fts5(
content,
content='',
contentless_delete=1,
tokenize = 'porter unicode61'
);
-- Document-title FTS5 index. Replaces the un-indexable
-- `LOWER(title) LIKE '%tok%'` title-LIKE search with O(K) hash
-- lookup. Pre-2026-05-02 the title-LIKE backup was either skipped
-- (>5 tokens) or paid ~10s/shard for short queries. The MATCH-based
-- replacement runs in ~0.05s/shard regardless of token count, which
-- means we can re-enable synonym-expanded title search for long
-- queries without paying the corpus-scan cost.
--
-- Contentless mode: same trick as chunks_fts — store only the
-- inverted index, not a copy of the title. The rowid joins back to
-- documents.rowid (sqlite's hidden integer rowid is fine for a
-- 1-1 mapping). On re-ingest, the FTS5 row gets replaced via the
-- ingest path's INSERT OR REPLACE INTO documents flow.
CREATE VIRTUAL TABLE IF NOT EXISTS documents_fts USING fts5(
title,
content='',
contentless_delete=1,
tokenize = 'porter unicode61'
);
-- Concept-relations layer. Append-only secondary index over the corpus.
-- Each row is a (token, target) edge of a given relation_kind, derived
-- from a specific source document by a specific extractor (evidence_kind).
-- Re-derivation is idempotent at the (source_root, relation_kind, token,
-- target, evidence_kind) level via UNIQUE.
--
-- This table is SEPARATE from the Merkle layer: writes here NEVER affect
-- document_root / chunk_root / cache_key. So the corpus's whole Merkle
-- tree stays valid across re-derivations; we can backfill or re-extract
-- concept relations without invalidating any cached answers.
--
-- Cross-shard lookup. Concept relations live in the shard whose document
-- they were derived from; the lookup helpers in arborist.concepts walk all
-- shards (same pattern as cross-shard FTS5 search). Mesh sync moves shards
-- between peers; concept relations come along for the ride automatically.
--
-- relation_kind:
-- 'synonym' - token & target retrieve interchangeably (See-also
-- bidirectional, redirect target, internal-link cluster)
-- 'antonym' - token & target are explicit opposites (manual / hatnote
-- "not to be confused with")
-- 'rivalry' - token & target compete in a category (same-category
-- membership without cross-link; manual rivalries)
-- 'category' - token belongs to category target (Wikipedia
-- [[Category:X]] tail; HTML schema.org/<meta> classification)
--
-- evidence_kind: which extractor produced the row. Lets `arborist concepts
-- purge --evidence-kind X` revoke a single extractor's output cleanly
-- without touching manual or other-extractor rows. New extractors register
-- a stable evidence_kind string; legacy seeds are 'manual_legacy'.
CREATE TABLE IF NOT EXISTS concept_relations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source_root TEXT NOT NULL,
relation_kind TEXT NOT NULL
CHECK (relation_kind IN ('synonym','antonym','rivalry','category')),
token TEXT NOT NULL,
target TEXT NOT NULL,
evidence_kind TEXT NOT NULL,
confidence REAL NOT NULL DEFAULT 1.0,
derived_at INTEGER NOT NULL,
derived_from TEXT, -- shard/uri/extractor identifier
UNIQUE (source_root, relation_kind, token, target, evidence_kind)
);
CREATE INDEX IF NOT EXISTS idx_concept_token ON concept_relations(token);
CREATE INDEX IF NOT EXISTS idx_concept_target ON concept_relations(target);
CREATE INDEX IF NOT EXISTS idx_concept_kind ON concept_relations(relation_kind);
CREATE INDEX IF NOT EXISTS idx_concept_evid ON concept_relations(evidence_kind);
-- Per-token corpus document-frequency (for IDF ranking at synonym
-- expansion cap-time). Computed once at backfill via fts5vocab over
-- chunks_fts. Only tokens that appear in concept_relations get a row;
-- the synonym layer is the consumer & it ranks expansion by 1/log(doc_freq)
-- when the cap saturates so common-corpus words drop before rare topical
-- ones.
--
-- doc_freq is FTS5-chunk-level (number of chunks containing the term);
-- adequate proxy for true doc-level since chunks are sized 512 tokens
-- and a doc rarely has the same term in only one chunk. Cross-shard
-- ranking sums doc_freq across all shards' rows.
CREATE TABLE IF NOT EXISTS concept_token_idf (
token TEXT PRIMARY KEY,
doc_freq INTEGER NOT NULL,
total_docs INTEGER NOT NULL,
derived_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_token_idf_freq ON concept_token_idf(doc_freq);
-- Adapter LossReport sidecar (ticket #000022). Per-chunk record of what
-- bytes the adapter / canonicalizer dropped, transformed, or normalized
-- on the way from raw bytes to the prose the model sees.
--
-- DESIGN: SIDECAR ONLY. This table never enters cache_key, document_root,
-- run_dag_root, or audit_events.event_hash preimage. Its policy lives in
-- ``loss_report_policy_hash`` (column below) — separate from the QA
-- ``governance_policy_hash`` so toggling reporting does NOT invalidate
-- prior cache entries. See ticket §3.7.
--
-- ``loss_mode`` semantics (closed enum):
-- 'pure_drop' content removed entirely (e.g. <ref> blocks).
-- sum(bytes_dropped) MUST be <= input_length_bytes.
-- 'transform' content rewritten to different bytes (e.g.
-- [[Velociraptor]] -> Velociraptor). bytes_dropped
-- records a delta, not a deletion. Excluded from
-- the byte-conservation property test.
-- 'quarantine' content preserved elsewhere but withheld from prose.
-- Reserved; no v0 emitter.
-- 'normalize' semantically low-content formatting normalization
-- (whitespace runs, newline collapse, runs).
-- Excluded from byte-conservation accounting.
--
-- ``loss_kind`` is free-string in v0 (per ticket §4 out-of-scope) so
-- adapter authors can mint new kinds. Promotion to enum after >=3
-- adapters are in tree.
--
-- Document-scope losses (e.g. HTML _normalize_text drops that happen
-- before chunking) are anchored to chunk_id of the lowest-idx chunk
-- of the document by convention.
CREATE TABLE IF NOT EXISTS adapter_loss_reports (
chunk_id INTEGER NOT NULL,
document_root TEXT NOT NULL,
stage TEXT NOT NULL,
canonicalization_version TEXT NOT NULL,
loss_kind TEXT NOT NULL,
loss_mode TEXT NOT NULL
CHECK (loss_mode IN ('pure_drop','transform','quarantine','normalize')),
bytes_dropped INTEGER NOT NULL,
occurrence_count INTEGER NOT NULL,
input_length_bytes INTEGER,
output_length_bytes INTEGER,
sample_excerpt TEXT,
sample_hash TEXT,
adapter_name TEXT,
adapter_version TEXT,
loss_report_policy_hash TEXT NOT NULL,
created_at INTEGER NOT NULL,
PRIMARY KEY (chunk_id, stage, canonicalization_version, loss_kind)
);
CREATE INDEX IF NOT EXISTS idx_adapter_loss_doc
ON adapter_loss_reports(document_root, stage);
CREATE INDEX IF NOT EXISTS idx_adapter_loss_kind
ON adapter_loss_reports(loss_kind, stage);
CREATE INDEX IF NOT EXISTS idx_adapter_loss_chunk
ON adapter_loss_reports(chunk_id, stage);
-- Citation aliases (#000041): map a claim-pack record's original
-- source_reference string to a substitute citation when the original
-- text isn't ingestable (proprietary / unavailable) but a comparable
-- PD work is. Read at warrant-resolve time when --use-aliases is
-- passed; alias-resolved derivations carry process_id
-- "warrant-resolver-v1+alias" so the audit trail tells substituted
-- chains from original ones.
--
-- Audit discipline: decision_at + decision_by + decision_rationale
-- are NOT NULL. The CLI refuses to add rows without those fields.
-- "Honest substitute, not silent fabrication."
CREATE TABLE IF NOT EXISTS citation_aliases (
original_ref TEXT NOT NULL,
substitute_ref TEXT NOT NULL,
substitute_authors TEXT NOT NULL, -- JSON array of strings
substitute_title TEXT NOT NULL,
decision_at INTEGER NOT NULL,
decision_by TEXT NOT NULL,
decision_rationale TEXT,
PRIMARY KEY (original_ref, substitute_ref)
);
CREATE INDEX IF NOT EXISTS idx_citation_alias_orig
ON citation_aliases(original_ref);
-- Term aliases (#000042): bridge vocabulary mismatches between a
-- claim-pack record's modern term and a textbook's historical /
-- foreign-language / pre-modern term for the same concept. E.g.,
-- modern "incidence" ↔ Hilbert-1902-translation "connection" in
-- the geometry domain. Read at warrant-resolve time when
-- --use-aliases is passed; query expansion replaces a token with
-- (token OR alt) so FTS5 matches either vocabulary.
--
-- Same audit discipline as citation_aliases.
CREATE TABLE IF NOT EXISTS term_aliases (
term TEXT NOT NULL,
alternate_term TEXT NOT NULL,
domain TEXT NOT NULL, -- e.g., "geometry", "logic"
decision_at INTEGER NOT NULL,
decision_by TEXT NOT NULL,
decision_rationale TEXT,
PRIMARY KEY (term, alternate_term, domain)
);
CREATE INDEX IF NOT EXISTS idx_term_alias_term_domain
ON term_aliases(term, domain);
"""
# Process-local migration memoization (#000026 Phase 1). Keyed by
# resolved path. Migrations are forward-only and idempotent within
# a code version, so once we've run them on a path in this process,
# we skip on every subsequent `connect()`. Pre-fix: 588 `connect()`
# calls per typical query, each running 7 migration probes. Post-
# fix: 1 probe sequence per (path, process), then skipped.
#
# Tradeoff: this assumes a path identifies a single SQLite file for
# the lifetime of the process. If a caller replaces the file at the
# same path (e.g. a test deletes + recreates), they MUST call
# `invalidate_migration_cache(path)` (or `_clear_migration_cache()`).
# We don't try to detect replacement automatically — (dev, inode)
# fails under tmpfs inode reuse, and (mtime, size) drift naturally
# as SQLite operates on the file (WAL checkpoints, page growth).
# Path-only with explicit invalidation is honest about the contract.
_MIGRATED_SHARDS: set[str] = set()
def _clear_migration_cache() -> None:
"""Drop all memoized migration claims. Mostly for tests, but
also for callers that knowingly replace a shard file at the same
path (e.g. snapshot-restore flows)."""
_MIGRATED_SHARDS.clear()
[docs]
def invalidate_migration_cache(db_path: Path | str) -> None:
"""Drop the memoized migration claim for one path. Call this
after replacing the underlying file; the next `connect()` will
re-run the full migration probe sequence."""
_MIGRATED_SHARDS.discard(str(Path(db_path).resolve()))
[docs]
def connect(db_path: Path | str = DEFAULT_DB_PATH) -> sqlite3.Connection:
"""Open a writable connection, creating the parent dir + schema if needed.
Performance pragmas applied per-connection. Under WAL (set in the schema):
- synchronous=NORMAL skips the per-commit fsync; durable up to the last
checkpoint (SQLite auto-checkpoints at WAL ~1000 frames).
- cache_size=-65536 = 64 MB page cache (reduces re-reads).
- temp_store=MEMORY keeps temp tables in RAM (no /tmp churn).
- mmap_size=256 MB lets reads come from page-cache without read() syscalls.
Migration probes (executescript(SCHEMA_SQL) + the forward migrations)
run once per (physical file, process). Subsequent ``connect()`` calls
on the same shard skip migration entirely — see #000026 Phase 1.
``busy_timeout`` is set on every connection (before the migration
pass, so it covers that too): a peer mid-write — migration DDL, a
``transaction()`` block, ``append_audit``'s own ``BEGIN IMMEDIATE`` —
makes us *wait* rather than fail fast with ``database is locked``.
Without it, concurrent appenders that fail-and-retry can re-read a
stale chain head and fork the audit chain (qa.db seq 7724/7725 was
that bug); waiting + the ``BEGIN IMMEDIATE`` serialization fixes it.
"""
p = Path(db_path)
p.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(p, isolation_level=None) # autocommit; we'll BEGIN manually
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA busy_timeout = 5000") # wait on a peer's write lock, don't fail-fast
cache_key = str(p.resolve())
if cache_key not in _MIGRATED_SHARDS:
conn.executescript(SCHEMA_SQL)
_migrate_audit_mode(conn)
_migrate_mesh_peer_chains(conn)
_migrate_document_http_meta(conn)
_migrate_selfmodel_tables(conn)
_migrate_capital_ledger(conn)
_migrate_memory_root(conn)
_migrate_adapter_loss_reports(conn)
_migrate_controller_events(conn)
_migrate_fork_score_branches(conn)
_MIGRATED_SHARDS.add(cache_key)
# Per-connection state — must run on EVERY open. SQLite scopes
# `foreign_keys` per-connection (not per-DB), so the FK-CASCADE
# behavior our schema relies on requires this PRAGMA every time
# we open a connection. The other settings are tuning flags that
# also live per-connection.
conn.execute("PRAGMA foreign_keys = ON")
conn.execute("PRAGMA synchronous = NORMAL")
conn.execute("PRAGMA cache_size = -65536")
conn.execute("PRAGMA temp_store = MEMORY")
conn.execute("PRAGMA mmap_size = 268435456")
return conn
def _add_column_if_missing(
conn: sqlite3.Connection, existing: set[str], column: str, ddl: str
) -> None:
"""Run an ``ALTER TABLE … ADD COLUMN`` idempotently.
SQLite has no ``ADD COLUMN IF NOT EXISTS``. The PRAGMA-derived
``existing`` column set is the common-case fast-path skip; the
``duplicate column name`` catch swallows the TOCTOU loser when two
``connect()`` calls race a fresh shard (both probe, both ADD — the
second would otherwise raise). Sibling DDL applied by the same
codebase, so the column definition the winner installed is identical
to ours.
"""
if column in existing:
return
try:
conn.execute(ddl)
except sqlite3.OperationalError as exc:
if "duplicate column name" not in str(exc).lower():
raise
def _migrate_audit_mode(conn: sqlite3.Connection) -> None:
"""Forward-migrate pre-v9.8-audit-mode providence_cache shards.
Adds audit_mode + n_quotes + n_verified + unverified_quotes + verifier_method
columns to DBs that pre-date the faithfulness-classification rollout. SQLite
ALTER TABLE ADD COLUMN is O(1) (metadata-only) so this is cheap on every
open. Idempotent under repeat-open (PRAGMA fast-path) and under concurrent
``connect()`` (the duplicate-column catch in ``_add_column_if_missing``).
Also handles the VISUAL → UNGROUNDED rename for the audit_mode value
space. SQLite cannot ALTER a column's CHECK in place, so legacy tables
with the old `CHECK (audit_mode IN ('STRICT','HYBRID','VISUAL'))` get
rebuilt via the standard temp-table dance, with values translated. Each
``_rebuild_providence_cache_*`` self-wraps in ``BEGIN IMMEDIATE`` so a
failure rolls back clean; under a concurrent ``connect()`` the loser
waits on that write txn (``busy_timeout`` is raised for the migration
pass — see ``connect()``) and then re-runs its probes harmlessly.
"""
cols = {row["name"] for row in conn.execute("PRAGMA table_info(providence_cache)")}
_add_column_if_missing(
conn, cols, "audit_mode",
"ALTER TABLE providence_cache ADD COLUMN audit_mode TEXT "
"NOT NULL DEFAULT 'UNGROUNDED' "
"CHECK (audit_mode IN ('STRICT','HYBRID','UNGROUNDED'))",
)
_add_column_if_missing(
conn, cols, "n_quotes",
"ALTER TABLE providence_cache ADD COLUMN n_quotes INTEGER NOT NULL DEFAULT 0",
)
_add_column_if_missing(
conn, cols, "n_verified",
"ALTER TABLE providence_cache ADD COLUMN n_verified INTEGER NOT NULL DEFAULT 0",
)
_add_column_if_missing(
conn, cols, "unverified_quotes",
"ALTER TABLE providence_cache ADD COLUMN unverified_quotes TEXT",
)
_add_column_if_missing(
conn, cols, "verifier_method",
"ALTER TABLE providence_cache ADD COLUMN verifier_method TEXT "
"NOT NULL DEFAULT 'none' "
"CHECK (verifier_method IN ('quote','span','entity','paraphrase','none'))",
)
_add_column_if_missing(
conn, cols, "run_dag_root",
"ALTER TABLE providence_cache ADD COLUMN run_dag_root TEXT",
)
_add_column_if_missing(
conn, cols, "run_dag_blob",
"ALTER TABLE providence_cache ADD COLUMN run_dag_blob TEXT",
)
# VISUAL → UNGROUNDED rename. Detect legacy CHECK by inspecting DDL.
ddl_row = conn.execute(
"SELECT sql FROM sqlite_master WHERE type='table' AND name='providence_cache'"
).fetchone()
if ddl_row and "'VISUAL'" in (ddl_row[0] or ""):
_rebuild_providence_cache_ungrounded(conn)
# paraphrase verifier_method addition. Detect legacy CHECK by
# inspecting DDL — if the constraint doesn't already list
# 'paraphrase', rebuild the table.
ddl_row = conn.execute(
"SELECT sql FROM sqlite_master WHERE type='table' AND name='providence_cache'"
).fetchone()
if ddl_row and "'paraphrase'" not in (ddl_row[0] or ""):
_rebuild_providence_cache_paraphrase(conn)
# claim_lattice verifier_method addition (G0 — quote-by-pointer mode).
ddl_row = conn.execute(
"SELECT sql FROM sqlite_master WHERE type='table' AND name='providence_cache'"
).fetchone()
if ddl_row and "'claim_lattice'" not in (ddl_row[0] or ""):
_rebuild_providence_cache_claim_lattice(conn)
# canonical_projection (#000027) — extends BOTH audit_mode and
# verifier_method CHECK constraints. CANONICAL_PROJECTION is the
# admissibility class for deterministic π* kernel answers
# (arithmetic@v1, logic-kernel@v1, …); 'canonical_projection' is the
# verifier-method token. Detect either missing → rebuild once.
ddl_row = conn.execute(
"SELECT sql FROM sqlite_master WHERE type='table' AND name='providence_cache'"
).fetchone()
if ddl_row and (
"'CANONICAL_PROJECTION'" not in (ddl_row[0] or "")
or "'canonical_projection'" not in (ddl_row[0] or "")
):
_rebuild_providence_cache_canonical_projection(conn)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_providence_audit "
"ON providence_cache(audit_mode)"
)
def _migrate_document_http_meta(conn: sqlite3.Connection) -> None:
"""Forward-migrate pre-recrawl-check shards.
Adds ``document_http_meta`` to DBs that pre-date the recrawl-check
feature. Mirrors ``_migrate_mesh_peer_chains``: idempotent
table-existence probe, then CREATE if missing.
"""
row = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='document_http_meta'"
).fetchone()
if row is None:
conn.execute(
"CREATE TABLE IF NOT EXISTS document_http_meta ("
" document_root TEXT PRIMARY KEY,"
" etag TEXT,"
" last_modified TEXT,"
" last_fetched_at INTEGER NOT NULL,"
" last_status INTEGER,"
" last_checked_at INTEGER,"
" FOREIGN KEY (document_root) REFERENCES documents(document_root)"
" ON DELETE CASCADE"
")"
)
def _migrate_selfmodel_tables(conn: sqlite3.Connection) -> None:
"""Forward-migrate to add SelfModel tables (ticket #000014).
Adds ``selfmodel_records`` + ``selfmodel_capability_claims`` to DBs
that pre-date SelfModel landing. Idempotent table-existence probe
then CREATE-if-missing, matching the existing migration pattern.
SelfModel rows are advisory by default — they do not enter
cache_key unless ``governance_policy.selfmodel_binding`` is
flipped on (per ticket #000014 §2.4).
"""
row = conn.execute(
"SELECT name FROM sqlite_master "
"WHERE type='table' AND name='selfmodel_records'"
).fetchone()
if row is None:
conn.execute(
"CREATE TABLE IF NOT EXISTS selfmodel_records ("
" selfmodel_root TEXT PRIMARY KEY,"
" schema_version TEXT NOT NULL,"
" parent_selfmodel_root TEXT,"
" model_profile_hash TEXT NOT NULL,"
" verifier_method_root TEXT NOT NULL,"
" governance_policy_hash TEXT NOT NULL,"
" canonicalization_version TEXT NOT NULL,"
" chunking_version TEXT NOT NULL,"
" accepted_patch_root TEXT,"
" rejected_patch_root TEXT,"
" memory_root TEXT,"
" state TEXT NOT NULL DEFAULT 'live'"
" CHECK (state IN ('live','stale','falsified')),"
" body_blob BLOB NOT NULL,"
" audit_event_hash TEXT NOT NULL,"
" created_at INTEGER NOT NULL,"
" falsified_at INTEGER,"
" falsified_reason TEXT"
")"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_selfmodel_state ON selfmodel_records(state)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_selfmodel_parent "
"ON selfmodel_records(parent_selfmodel_root)"
)
row = conn.execute(
"SELECT name FROM sqlite_master "
"WHERE type='table' AND name='selfmodel_capability_claims'"
).fetchone()
if row is None:
conn.execute(
"CREATE TABLE IF NOT EXISTS selfmodel_capability_claims ("
" claim_hash TEXT PRIMARY KEY,"
" selfmodel_root TEXT NOT NULL,"
" metric TEXT NOT NULL,"
" threshold REAL NOT NULL,"
" eval_digest TEXT NOT NULL,"
" measured_value REAL,"
" measured_at INTEGER,"
" validity_horizon TEXT,"
" body_blob BLOB NOT NULL,"
" FOREIGN KEY (selfmodel_root) REFERENCES selfmodel_records(selfmodel_root)"
")"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_claim_metric "
"ON selfmodel_capability_claims(metric)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_claim_selfmodel "
"ON selfmodel_capability_claims(selfmodel_root)"
)
def _migrate_capital_ledger(conn: sqlite3.Connection) -> None:
"""Forward-migrate to add capital-cost ledger (ticket #000020).
Adds ``capital_ledger`` to DBs that pre-date the 8-capital-form
cost attribution layer. Sibling table — does NOT enter
audit_events.event_hash preimage. Op authors pass an optional
CapitalProfile to ``append_audit`` and we record an attached
ledger row; no profile = no row, fully backward-compatible.
"""
row = conn.execute(
"SELECT name FROM sqlite_master "
"WHERE type='table' AND name='capital_ledger'"
).fetchone()
if row is None:
conn.execute(
"CREATE TABLE IF NOT EXISTS capital_ledger ("
" ledger_id INTEGER PRIMARY KEY AUTOINCREMENT,"
" audit_event_hash TEXT NOT NULL,"
" op_type TEXT NOT NULL,"
" living REAL NOT NULL DEFAULT 0,"
" material REAL NOT NULL DEFAULT 0,"
" financial REAL NOT NULL DEFAULT 0,"
" intellectual REAL NOT NULL DEFAULT 0,"
" experiential REAL NOT NULL DEFAULT 0,"
" social REAL NOT NULL DEFAULT 0,"
" cultural REAL NOT NULL DEFAULT 0,"
" spiritual REAL NOT NULL DEFAULT 0,"
" estimator_version TEXT NOT NULL,"
" estimator_inputs_blob TEXT,"
" recorded_at INTEGER NOT NULL"
")"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_capital_ledger_audit "
"ON capital_ledger(audit_event_hash)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_capital_ledger_op "
"ON capital_ledger(op_type)"
)
def _migrate_controller_events(conn: sqlite3.Connection) -> None:
"""Forward-migrate to add controller_events (ticket #000037 Phase 2).
Adds ``controller_events`` to DBs that pre-date the
Prometheus-Sigma advisory-write layer. Sibling table — does NOT
enter audit_events.event_hash preimage. Operators can query /
aggregate / prune controller_events without integrity risk.
Three event kinds populate this table:
- ``controller_decision`` — one per ControllerDecision
- ``controller_difficulty`` — one per ControllerDecision
(records difficulty_next)
- ``controller_budget_allocation`` — one per nonzero allocation
branch in the decision
Idempotent via UNIQUE (event_kind, body_hash). body_hash is the
sha256 of canonical-JSON-encoded body.
"""
row = conn.execute(
"SELECT name FROM sqlite_master "
"WHERE type='table' AND name='controller_events'"
).fetchone()
if row is None:
conn.execute(
"CREATE TABLE IF NOT EXISTS controller_events ("
" event_id INTEGER PRIMARY KEY AUTOINCREMENT,"
" organism_root TEXT NOT NULL,"
" branch_id TEXT,"
" event_kind TEXT NOT NULL,"
" label TEXT,"
" entropy REAL,"
" difficulty REAL,"
" allocation REAL,"
" body_blob TEXT NOT NULL,"
" body_hash TEXT NOT NULL,"
" recorded_at INTEGER NOT NULL,"
" UNIQUE (event_kind, body_hash)"
")"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_controller_events_organism "
"ON controller_events(organism_root)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_controller_events_kind "
"ON controller_events(event_kind)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_controller_events_at "
"ON controller_events(recorded_at)"
)
def _migrate_fork_score_branches(conn: sqlite3.Connection) -> None:
"""Forward-migrate to add fork_score_branches (ticket #000012 Phase 1c).
Adds the sibling table that lets ``arborist substrate score``
persist multi-branch checkpoints. Sibling — does NOT enter
``audit_events.event_hash`` preimage, so re-scoring or back-
filling cannot break the audit chain. PK ``(branch_set_id,
branch_id)`` so re-scoring the same fork under the same set is
a clean upsert, not a duplicate row. Default-off at the CLI
surface (``--branch-set`` flag absent ⇒ no row written).
Feeds #000037 §12 Trigger 1 — once a checkpoint accumulates ≥4
branches in this table, the multi-branch path of the
Prometheus-Σ controller has empirical data to fire on.
"""
row = conn.execute(
"SELECT name FROM sqlite_master "
"WHERE type='table' AND name='fork_score_branches'"
).fetchone()
if row is None:
conn.execute(
"CREATE TABLE IF NOT EXISTS fork_score_branches ("
" branch_set_id TEXT NOT NULL,"
" branch_id TEXT NOT NULL,"
" parent_root TEXT NOT NULL,"
" child_root TEXT,"
" score REAL NOT NULL,"
" verdict TEXT NOT NULL,"
" breakdown_blob TEXT NOT NULL,"
" weights_id TEXT NOT NULL,"
" estimator_version TEXT NOT NULL,"
" recorded_at INTEGER NOT NULL,"
" PRIMARY KEY (branch_set_id, branch_id)"
")"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_fork_score_branches_set "
"ON fork_score_branches(branch_set_id)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_fork_score_branches_parent "
"ON fork_score_branches(parent_root)"
)
def _migrate_memory_root(conn: sqlite3.Connection) -> None:
"""Forward-migrate to add memory-root tables (ticket #000017).
Adds ``memory_records`` + ``memory_branch_summaries`` to DBs
that pre-date the lifelong-learning audit summary layer. Memory
snapshots are advisory by default — they do not enter cache_key
unless ``governance_policy.memory_binding`` is flipped on. See
ticket #000017 for branch-projection semantics.
"""
row = conn.execute(
"SELECT name FROM sqlite_master "
"WHERE type='table' AND name='memory_records'"
).fetchone()
if row is None:
conn.execute(
"CREATE TABLE IF NOT EXISTS memory_records ("
" memory_root TEXT PRIMARY KEY,"
" schema_version TEXT NOT NULL,"
" parent_memory_root TEXT,"
" audit_events_high_water TEXT NOT NULL,"
" branch_summaries_blob BLOB NOT NULL,"
" state TEXT NOT NULL DEFAULT 'live'"
" CHECK (state IN ('live','stale','falsified')),"
" audit_event_hash TEXT NOT NULL,"
" created_at INTEGER NOT NULL,"
" falsified_at INTEGER,"
" falsified_reason TEXT"
")"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_memory_state ON memory_records(state)"
)
row = conn.execute(
"SELECT name FROM sqlite_master "
"WHERE type='table' AND name='memory_branch_summaries'"
).fetchone()
if row is None:
conn.execute(
"CREATE TABLE IF NOT EXISTS memory_branch_summaries ("
" branch_id TEXT NOT NULL,"
" memory_root TEXT NOT NULL,"
" summary_digest TEXT NOT NULL,"
" summary_blob BLOB NOT NULL,"
" count INTEGER NOT NULL,"
" PRIMARY KEY (branch_id, memory_root),"
" FOREIGN KEY (memory_root) REFERENCES memory_records(memory_root)"
")"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_branch_memory "
"ON memory_branch_summaries(memory_root)"
)
def _migrate_adapter_loss_reports(conn: sqlite3.Connection) -> None:
"""Forward-migrate to add adapter_loss_reports (ticket #000022).
Sidecar — does NOT enter cache_key, document_root, run_dag_root,
audit_events.event_hash preimage. Idempotent table-existence probe,
then CREATE-if-missing, mirroring the established migration pattern.
"""
row = conn.execute(
"SELECT name FROM sqlite_master "
"WHERE type='table' AND name='adapter_loss_reports'"
).fetchone()
if row is None:
conn.execute(
"CREATE TABLE IF NOT EXISTS adapter_loss_reports ("
" chunk_id INTEGER NOT NULL,"
" document_root TEXT NOT NULL,"
" stage TEXT NOT NULL,"
" canonicalization_version TEXT NOT NULL,"
" loss_kind TEXT NOT NULL,"
" loss_mode TEXT NOT NULL"
" CHECK (loss_mode IN ('pure_drop','transform','quarantine','normalize')),"
" bytes_dropped INTEGER NOT NULL,"
" occurrence_count INTEGER NOT NULL,"
" input_length_bytes INTEGER,"
" output_length_bytes INTEGER,"
" sample_excerpt TEXT,"
" sample_hash TEXT,"
" adapter_name TEXT,"
" adapter_version TEXT,"
" loss_report_policy_hash TEXT NOT NULL,"
" created_at INTEGER NOT NULL,"
" PRIMARY KEY (chunk_id, stage, canonicalization_version, loss_kind)"
")"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_adapter_loss_doc "
"ON adapter_loss_reports(document_root, stage)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_adapter_loss_kind "
"ON adapter_loss_reports(loss_kind, stage)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_adapter_loss_chunk "
"ON adapter_loss_reports(chunk_id, stage)"
)
def _migrate_mesh_peer_chains(conn: sqlite3.Connection) -> None:
"""Forward-migrate pre-mesh-fork-detection shards.
Adds the `mesh_peer_chains` table to DBs that pre-date per-peer
audit-chain tracking on the mesh wire. CREATE TABLE IF NOT EXISTS
in SCHEMA_SQL covers brand-new shards; this migration is a belt-
and-suspenders idempotency check for callers that bypass the full
SCHEMA_SQL pass (cross-shard query views, etc.). Idempotent —
PRAGMA-checks before issuing CREATE.
"""
row = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='mesh_peer_chains'"
).fetchone()
if row is None:
conn.execute(
"CREATE TABLE IF NOT EXISTS mesh_peer_chains ("
" peer_member_id TEXT PRIMARY KEY,"
" last_event_hash TEXT NOT NULL,"
" last_seq INTEGER NOT NULL,"
" last_seen_at INTEGER NOT NULL"
")"
)
def _rebuild_providence_cache_paraphrase(conn: sqlite3.Connection) -> None:
"""One-time table rebuild: expand verifier_method CHECK to include
'paraphrase' (4th verifier strategy: token-coverage paraphrase
matching, alongside quote/span/entity).
Same pattern as ``_rebuild_providence_cache_ungrounded``: SQLite
can't modify a CHECK constraint in place, so we create a new table
with the expanded CHECK, copy the data verbatim (no value
translation needed — paraphrase is additive), drop the old, rename
the new. Caller probes existing CHECK from sqlite_master and only
invokes this when 'paraphrase' is missing.
"""
new_create = """
CREATE TABLE providence_cache_new (
cache_key TEXT PRIMARY KEY,
source_root TEXT NOT NULL,
document_uri TEXT NOT NULL,
question_hash TEXT NOT NULL,
question_text TEXT NOT NULL,
answer_text TEXT NOT NULL,
merkle_proof TEXT NOT NULL,
model_profile_hash TEXT NOT NULL,
conversation_hash TEXT NOT NULL,
governance_policy_hash TEXT NOT NULL,
schema_version TEXT NOT NULL,
canonicalization_version TEXT NOT NULL,
chunking_version TEXT NOT NULL,
falsification_state TEXT NOT NULL DEFAULT 'live'
CHECK (falsification_state IN ('live','failed','stale','quarantined')),
chain TEXT NOT NULL DEFAULT 'private'
CHECK (chain IN ('private','public')),
audit_event_hash TEXT,
created_at INTEGER NOT NULL,
last_hit_at INTEGER,
hit_count INTEGER NOT NULL DEFAULT 0,
audit_mode TEXT NOT NULL DEFAULT 'UNGROUNDED'
CHECK (audit_mode IN ('STRICT','HYBRID','UNGROUNDED')),
n_quotes INTEGER NOT NULL DEFAULT 0,
n_verified INTEGER NOT NULL DEFAULT 0,
unverified_quotes TEXT,
verifier_method TEXT NOT NULL DEFAULT 'none'
CHECK (verifier_method IN ('quote','span','entity','paraphrase','none')),
run_dag_root TEXT,
run_dag_blob TEXT
)
"""
conn.execute("BEGIN IMMEDIATE")
try:
conn.execute(new_create)
conn.execute(
"INSERT INTO providence_cache_new "
"(cache_key, source_root, document_uri, question_hash, question_text, "
" answer_text, merkle_proof, model_profile_hash, conversation_hash, "
" governance_policy_hash, schema_version, canonicalization_version, "
" chunking_version, falsification_state, chain, audit_event_hash, "
" created_at, last_hit_at, hit_count, audit_mode, n_quotes, "
" n_verified, unverified_quotes, verifier_method) "
"SELECT "
" cache_key, source_root, document_uri, question_hash, question_text, "
" answer_text, merkle_proof, model_profile_hash, conversation_hash, "
" governance_policy_hash, schema_version, canonicalization_version, "
" chunking_version, falsification_state, chain, audit_event_hash, "
" created_at, last_hit_at, hit_count, audit_mode, n_quotes, "
" n_verified, unverified_quotes, verifier_method "
"FROM providence_cache"
)
conn.execute("DROP TABLE providence_cache")
conn.execute("ALTER TABLE providence_cache_new RENAME TO providence_cache")
conn.execute("COMMIT")
except Exception:
conn.execute("ROLLBACK")
raise
def _rebuild_providence_cache_claim_lattice(conn: sqlite3.Connection) -> None:
"""One-time table rebuild: expand verifier_method CHECK to include
'claim_lattice' (quote-by-pointer answer mode added in G0).
Same pattern as the paraphrase / ungrounded rebuilds: SQLite cannot
modify a CHECK constraint in place, so we create a new table with
the expanded CHECK, copy the data verbatim (claim_lattice is
additive — no value translation needed), drop the old, rename the
new. Caller probes existing CHECK from sqlite_master and only
invokes this when 'claim_lattice' is missing.
Unlike the older rebuilds this one preserves ``run_dag_root`` &
``run_dag_blob`` columns in the copy. By the time a shard reaches
this migration those columns may already be populated; dropping
them would erase per-run computation provenance.
"""
new_create = """
CREATE TABLE providence_cache_new (
cache_key TEXT PRIMARY KEY,
source_root TEXT NOT NULL,
document_uri TEXT NOT NULL,
question_hash TEXT NOT NULL,
question_text TEXT NOT NULL,
answer_text TEXT NOT NULL,
merkle_proof TEXT NOT NULL,
model_profile_hash TEXT NOT NULL,
conversation_hash TEXT NOT NULL,
governance_policy_hash TEXT NOT NULL,
schema_version TEXT NOT NULL,
canonicalization_version TEXT NOT NULL,
chunking_version TEXT NOT NULL,
falsification_state TEXT NOT NULL DEFAULT 'live'
CHECK (falsification_state IN ('live','failed','stale','quarantined')),
chain TEXT NOT NULL DEFAULT 'private'
CHECK (chain IN ('private','public')),
audit_event_hash TEXT,
created_at INTEGER NOT NULL,
last_hit_at INTEGER,
hit_count INTEGER NOT NULL DEFAULT 0,
audit_mode TEXT NOT NULL DEFAULT 'UNGROUNDED'
CHECK (audit_mode IN ('STRICT','HYBRID','UNGROUNDED')),
n_quotes INTEGER NOT NULL DEFAULT 0,
n_verified INTEGER NOT NULL DEFAULT 0,
unverified_quotes TEXT,
verifier_method TEXT NOT NULL DEFAULT 'none'
CHECK (verifier_method IN ('quote','span','entity','paraphrase','claim_lattice','none')),
run_dag_root TEXT,
run_dag_blob TEXT
)
"""
conn.execute("BEGIN IMMEDIATE")
try:
conn.execute(new_create)
conn.execute(
"INSERT INTO providence_cache_new "
"(cache_key, source_root, document_uri, question_hash, question_text, "
" answer_text, merkle_proof, model_profile_hash, conversation_hash, "
" governance_policy_hash, schema_version, canonicalization_version, "
" chunking_version, falsification_state, chain, audit_event_hash, "
" created_at, last_hit_at, hit_count, audit_mode, n_quotes, "
" n_verified, unverified_quotes, verifier_method, "
" run_dag_root, run_dag_blob) "
"SELECT "
" cache_key, source_root, document_uri, question_hash, question_text, "
" answer_text, merkle_proof, model_profile_hash, conversation_hash, "
" governance_policy_hash, schema_version, canonicalization_version, "
" chunking_version, falsification_state, chain, audit_event_hash, "
" created_at, last_hit_at, hit_count, audit_mode, n_quotes, "
" n_verified, unverified_quotes, verifier_method, "
" run_dag_root, run_dag_blob "
"FROM providence_cache"
)
conn.execute("DROP TABLE providence_cache")
conn.execute("ALTER TABLE providence_cache_new RENAME TO providence_cache")
conn.execute("COMMIT")
except Exception:
conn.execute("ROLLBACK")
raise
def _rebuild_providence_cache_canonical_projection(conn: sqlite3.Connection) -> None:
"""One-time table rebuild: extend audit_mode CHECK to admit
'CANONICAL_PROJECTION' AND verifier_method CHECK to admit
'canonical_projection' (#000027 — canonical projections persist).
Same temp-table dance as the paraphrase / claim_lattice rebuilds.
SQLite cannot modify a CHECK constraint in place; create new table
with both extended CHECKs, copy data verbatim (the new tokens are
additive — no value translation), drop old, rename new.
Caller probes existing CHECK from sqlite_master and only invokes
this helper when EITHER token is missing from the constraint
string. Idempotent.
"""
new_create = """
CREATE TABLE providence_cache_new (
cache_key TEXT PRIMARY KEY,
source_root TEXT NOT NULL,
document_uri TEXT NOT NULL,
question_hash TEXT NOT NULL,
question_text TEXT NOT NULL,
answer_text TEXT NOT NULL,
merkle_proof TEXT NOT NULL,
model_profile_hash TEXT NOT NULL,
conversation_hash TEXT NOT NULL,
governance_policy_hash TEXT NOT NULL,
schema_version TEXT NOT NULL,
canonicalization_version TEXT NOT NULL,
chunking_version TEXT NOT NULL,
falsification_state TEXT NOT NULL DEFAULT 'live'
CHECK (falsification_state IN ('live','failed','stale','quarantined')),
chain TEXT NOT NULL DEFAULT 'private'
CHECK (chain IN ('private','public')),
audit_event_hash TEXT,
created_at INTEGER NOT NULL,
last_hit_at INTEGER,
hit_count INTEGER NOT NULL DEFAULT 0,
audit_mode TEXT NOT NULL DEFAULT 'UNGROUNDED'
CHECK (audit_mode IN ('STRICT','HYBRID','UNGROUNDED','CANONICAL_PROJECTION')),
n_quotes INTEGER NOT NULL DEFAULT 0,
n_verified INTEGER NOT NULL DEFAULT 0,
unverified_quotes TEXT,
verifier_method TEXT NOT NULL DEFAULT 'none'
CHECK (verifier_method IN ('quote','span','entity','paraphrase','claim_lattice','canonical_projection','none')),
run_dag_root TEXT,
run_dag_blob TEXT
)
"""
conn.execute("BEGIN IMMEDIATE")
try:
conn.execute(new_create)
conn.execute(
"INSERT INTO providence_cache_new "
"(cache_key, source_root, document_uri, question_hash, question_text, "
" answer_text, merkle_proof, model_profile_hash, conversation_hash, "
" governance_policy_hash, schema_version, canonicalization_version, "
" chunking_version, falsification_state, chain, audit_event_hash, "
" created_at, last_hit_at, hit_count, audit_mode, n_quotes, "
" n_verified, unverified_quotes, verifier_method, "
" run_dag_root, run_dag_blob) "
"SELECT "
" cache_key, source_root, document_uri, question_hash, question_text, "
" answer_text, merkle_proof, model_profile_hash, conversation_hash, "
" governance_policy_hash, schema_version, canonicalization_version, "
" chunking_version, falsification_state, chain, audit_event_hash, "
" created_at, last_hit_at, hit_count, audit_mode, n_quotes, "
" n_verified, unverified_quotes, verifier_method, "
" run_dag_root, run_dag_blob "
"FROM providence_cache"
)
conn.execute("DROP TABLE providence_cache")
conn.execute("ALTER TABLE providence_cache_new RENAME TO providence_cache")
conn.execute("COMMIT")
except Exception:
conn.execute("ROLLBACK")
raise
def _rebuild_providence_cache_ungrounded(conn: sqlite3.Connection) -> None:
"""One-time table rebuild: rename audit_mode value VISUAL → UNGROUNDED.
SQLite cannot modify a column's CHECK constraint in place. Standard
pattern: create new table with new CHECK, copy data while translating
values, drop old, rename new. Wrapped in IMMEDIATE transaction so a
failure rolls back cleanly without leaving the DB half-migrated.
"""
new_create = """
CREATE TABLE providence_cache_new (
cache_key TEXT PRIMARY KEY,
source_root TEXT NOT NULL,
document_uri TEXT NOT NULL,
question_hash TEXT NOT NULL,
question_text TEXT NOT NULL,
answer_text TEXT NOT NULL,
merkle_proof TEXT NOT NULL,
model_profile_hash TEXT NOT NULL,
conversation_hash TEXT NOT NULL,
governance_policy_hash TEXT NOT NULL,
schema_version TEXT NOT NULL,
canonicalization_version TEXT NOT NULL,
chunking_version TEXT NOT NULL,
falsification_state TEXT NOT NULL DEFAULT 'live'
CHECK (falsification_state IN ('live','failed','stale','quarantined')),
chain TEXT NOT NULL DEFAULT 'private'
CHECK (chain IN ('private','public')),
audit_event_hash TEXT,
created_at INTEGER NOT NULL,
last_hit_at INTEGER,
hit_count INTEGER NOT NULL DEFAULT 0,
audit_mode TEXT NOT NULL DEFAULT 'UNGROUNDED'
CHECK (audit_mode IN ('STRICT','HYBRID','UNGROUNDED')),
n_quotes INTEGER NOT NULL DEFAULT 0,
n_verified INTEGER NOT NULL DEFAULT 0,
unverified_quotes TEXT,
verifier_method TEXT NOT NULL DEFAULT 'none'
CHECK (verifier_method IN ('quote','span','entity','paraphrase','none')),
run_dag_root TEXT,
run_dag_blob TEXT
)
"""
conn.execute("BEGIN IMMEDIATE")
try:
conn.execute(new_create)
conn.execute(
"INSERT INTO providence_cache_new "
"(cache_key, source_root, document_uri, question_hash, question_text, "
" answer_text, merkle_proof, model_profile_hash, conversation_hash, "
" governance_policy_hash, schema_version, canonicalization_version, "
" chunking_version, falsification_state, chain, audit_event_hash, "
" created_at, last_hit_at, hit_count, audit_mode, n_quotes, "
" n_verified, unverified_quotes, verifier_method) "
"SELECT "
" cache_key, source_root, document_uri, question_hash, question_text, "
" answer_text, merkle_proof, model_profile_hash, conversation_hash, "
" governance_policy_hash, schema_version, canonicalization_version, "
" chunking_version, falsification_state, chain, audit_event_hash, "
" created_at, last_hit_at, hit_count, "
" CASE WHEN audit_mode = 'VISUAL' THEN 'UNGROUNDED' ELSE audit_mode END, "
" n_quotes, n_verified, unverified_quotes, verifier_method "
"FROM providence_cache"
)
conn.execute("DROP TABLE providence_cache")
conn.execute("ALTER TABLE providence_cache_new RENAME TO providence_cache")
conn.execute("COMMIT")
except Exception:
conn.execute("ROLLBACK")
raise
# Tables that exist in every shard with the same schema. Used to build
# cross-shard UNION views in connect_query().
_SHARDABLE_TABLES = (
"documents",
"chunks",
"merkle_nodes",
"edges",
"derivations",
"providence_cache",
"audit_events",
"falsifications",
"concept_relations",
"concept_token_idf",
"adapter_loss_reports",
)
# Per-table column lists for cross-shard UNION views. The `chunks` table
# is pinned explicitly because the column order matters for cross-shard
# search: chunks_fts is contentless and joins back to `chunks.chunk_id`.
# Mixing legacy (composite-PK, no chunk_id column) shards with current
# (chunk_id-keyed) shards in the same --shards-dir is unsupported — run
# the migration on legacy shards first or keep them in a separate dir.
_SHARED_COLUMNS = {
"chunks": "chunk_id, document_root, idx, leaf_hash, content, tier",
}
[docs]
def discover_shards(shards_dir: Path | str) -> list[Path]:
"""Enumerate shard DB files in `shards_dir`. Returns sorted list of paths."""
p = Path(shards_dir)
if not p.is_dir():
return []
return sorted(p.glob("*.db"))
[docs]
def connect_query(
db_path: Path | str | None = None,
shards_dir: Path | str | None = None,
) -> sqlite3.Connection:
"""Open a read-only-style connection that surfaces ALL shards as one DB.
If `shards_dir` is set, every `*.db` in it is ATTACHed and UNION ALL views
are created over the standard tables so existing queries (`SELECT * FROM
documents`) work unchanged across shards. Reads only — writes still go
through `connect()` against a specific shard.
If `shards_dir` is None, returns a normal `connect(db_path)` for back-compat.
"""
if shards_dir is None:
return connect(db_path or DEFAULT_DB_PATH)
shard_paths = discover_shards(shards_dir)
conn = sqlite3.connect(":memory:", isolation_level=None)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA temp_store = MEMORY")
if not shard_paths:
# Nothing attached; create empty placeholder tables so callers don't crash.
conn.executescript(SCHEMA_SQL)
return conn
aliases: list[str] = []
for i, sp in enumerate(shard_paths):
alias = f"sh{i:03d}"
conn.execute(f"ATTACH DATABASE ? AS {alias}", (str(sp.resolve()),))
aliases.append(alias)
# UNION ALL views over the shardable tables. Columns are listed
# explicitly (not `SELECT *`) so a shard cluster that mixes the prior
# composite-PK chunks layout with the newer chunk_id-keyed layout still
# unions cleanly — the explicit list is the intersection of columns
# present in both schema generations.
for table in _SHARDABLE_TABLES:
cols = _SHARED_COLUMNS.get(table, "*")
unions = " UNION ALL ".join(
f"SELECT {cols} FROM {a}.{table}" for a in aliases
)
conn.execute(f"CREATE TEMP VIEW {table} AS {unions}")
# Stash the shard list for tools that want it.
conn.execute(
"CREATE TEMP TABLE _shards (shard_id TEXT, path TEXT, alias TEXT)"
)
conn.executemany(
"INSERT INTO _shards (shard_id, path, alias) VALUES (?, ?, ?)",
[(p.stem, str(p.resolve()), a) for p, a in zip(shard_paths, aliases)],
)
return conn
[docs]
def connect_readonly(db_path: Path | str) -> sqlite3.Connection:
"""Open an existing DB read-only — ``mode=ro`` URI, no schema
bootstrap, no migration probes.
For CLI paths that only walk data (audit-chain checks, per-shard
counts): a read op must neither run DDL nor take a write lock, but
``connect()`` does both — it runs ``executescript(SCHEMA_SQL)`` plus
the forward migrations on the first open of each file in a process.
(That migration pass on a read-only walk is exactly what surfaced
the ``fork_score_branches already exists`` crash.) Raises
``sqlite3.OperationalError`` if the file is missing or unreadable;
callers iterating a shard set should catch and skip if they expect
stragglers.
"""
conn = sqlite3.connect(
f"file:{Path(db_path)}?mode=ro", uri=True, isolation_level=None
)
conn.row_factory = sqlite3.Row
return conn
[docs]
@contextmanager
def transaction(conn: sqlite3.Connection) -> Iterator[sqlite3.Connection]:
"""BEGIN IMMEDIATE / COMMIT / ROLLBACK around a block."""
conn.execute("BEGIN IMMEDIATE")
try:
yield conn
except Exception:
conn.execute("ROLLBACK")
raise
else:
conn.execute("COMMIT")
def _canonical_json(obj) -> str:
"""Stable JSON for audit hashing: sorted keys, no whitespace."""
return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False)
[docs]
def latest_event_hash(conn: sqlite3.Connection) -> str | None:
"""Return the last event_hash in the audit chain, or None for genesis."""
row = conn.execute(
"SELECT event_hash FROM audit_events ORDER BY seq DESC LIMIT 1"
).fetchone()
return row["event_hash"] if row else None
[docs]
def chain_audit_events(
prev_event_hash: str | None,
events: list[dict],
) -> tuple[list[tuple], str | None]:
"""Compute the event_hash chain for a batch in pure Python.
Each event dict needs: `event_type`, `body` (dict), `subject_root` (str|None), `ts` (int).
Returns (rows_for_executemany, last_event_hash). Insert with:
executemany("INSERT INTO audit_events
(event_hash, prev_event_hash, event_type, subject_root,
body, ts) VALUES (?, ?, ?, ?, ?, ?)", rows)
All chain SHA-256s are computed locally — zero DB round-trips per event.
"""
import hashlib
rows: list[tuple] = []
prev = prev_event_hash
for ev in events:
body_json = _canonical_json(ev["body"])
h = hashlib.sha256()
if prev is not None:
h.update(bytes.fromhex(prev))
h.update(body_json.encode("utf-8", errors="surrogatepass"))
event_hash = h.hexdigest()
rows.append(
(
event_hash,
prev,
ev["event_type"],
ev.get("subject_root"),
body_json,
ev["ts"],
)
)
prev = event_hash
return rows, prev
[docs]
def append_audit(
conn: sqlite3.Connection,
event_type: str,
body: dict,
subject_root: str | None = None,
ts: int | None = None,
) -> str:
"""Append one event to the audit chain. Returns the new event_hash (hex).
Atomic head-read + insert. If the connection is not already inside a
transaction, the read of the current chain head and the INSERT run
inside this call's own ``BEGIN IMMEDIATE`` / ``COMMIT`` — so two
concurrent appenders serialize on the write lock instead of both
reading the same head and chaining off it (which forks the chain;
qa.db seq 7724/7725 was exactly that, from two concurrent
``providence_burn`` writes). A caller already inside a ``transaction()``
gets the append folded into that unit. With ``connect()``'s
``busy_timeout`` the loser waits rather than failing ``database is
locked``.
Convenience wrapper for one-off events. Bulk inserts should use
chain_audit_events() + executemany() inside a ``transaction()`` for
~10x throughput on large batches (same serialization guarantee).
"""
import hashlib
if ts is None:
ts = int(time.time())
body_json = _canonical_json(body)
own_txn = not conn.in_transaction
if own_txn:
conn.execute("BEGIN IMMEDIATE")
try:
prev = latest_event_hash(conn)
h = hashlib.sha256()
if prev is not None:
h.update(bytes.fromhex(prev))
h.update(body_json.encode("utf-8", errors="surrogatepass"))
event_hash = h.hexdigest()
conn.execute(
"INSERT INTO audit_events (event_hash, prev_event_hash, event_type, subject_root, body, ts) "
"VALUES (?, ?, ?, ?, ?, ?)",
(event_hash, prev, event_type, subject_root, body_json, ts),
)
except BaseException:
if own_txn:
conn.execute("ROLLBACK")
raise
if own_txn:
conn.execute("COMMIT")
return event_hash
[docs]
def stats(conn: sqlite3.Connection) -> dict:
"""Quick landscape report."""
def one(sql: str, *args) -> int:
return conn.execute(sql, args).fetchone()[0]
return {
"documents_total": one("SELECT COUNT(*) FROM documents"),
"documents_surface": one("SELECT COUNT(*) FROM documents WHERE kind='surface'"),
"documents_core": one("SELECT COUNT(*) FROM documents WHERE kind='core'"),
"chunks_total": one("SELECT COUNT(*) FROM chunks"),
"chunks_hot": one("SELECT COUNT(*) FROM chunks WHERE tier='hot'"),
"chunks_warm": one("SELECT COUNT(*) FROM chunks WHERE tier='warm'"),
"chunks_cold": one("SELECT COUNT(*) FROM chunks WHERE tier='cold'"),
"edges_total": one("SELECT COUNT(*) FROM edges"),
"providence_total": one("SELECT COUNT(*) FROM providence_cache"),
"audit_events_total": one("SELECT COUNT(*) FROM audit_events"),
}