Source code for arborist.store

"""SQLite-backed v9.8 store.

Schema implements the Merkle-AGI v9.8 admissibility ledger:

- 8-dim providence_cache key (source_root, question_hash,
  model_profile_hash, conversation_hash, governance_policy_hash,
  schema_version, canonicalization_version, chunking_version)
- falsification_state ∈ {live, failed, stale, quarantined}
- audit_events append-only chain (event_hash chains via prev_event_hash)
- documents.kind ∈ {surface, core} for layered compression
- chunks.tier ∈ {hot, warm, cold} for reversible eviction
- derivations table binds core docs back to source surface roots

The providence_cache layer is schema-only in Phase 0 — no Q&A inference yet.
"""

from __future__ import annotations

import json
import sqlite3
import time
from contextlib import contextmanager
from pathlib import Path
from typing import Iterator


DEFAULT_DB_PATH = Path.home() / ".arborist" / "arborist.db"


SCHEMA_SQL = """
PRAGMA journal_mode = WAL;
PRAGMA foreign_keys = ON;

CREATE TABLE IF NOT EXISTS schema_meta (
    key   TEXT PRIMARY KEY,
    value TEXT NOT NULL
);

-- Free-form per-DB metadata. Used by the resume mechanic to track each
-- source's high-water mark so a stopped ingest can rsync forward without
-- re-parsing rows that are already in this DB.
CREATE TABLE IF NOT EXISTS meta (
    key   TEXT PRIMARY KEY,
    value TEXT NOT NULL,
    updated_at INTEGER
);

-- Documents: surface (raw ingest) or core (distilled, Merkle-signed back).
CREATE TABLE IF NOT EXISTS documents (
    document_root            TEXT PRIMARY KEY,    -- hex sha256 of merkle root
    document_uri             TEXT NOT NULL,
    source_type              TEXT NOT NULL,
    kind                     TEXT NOT NULL DEFAULT 'surface'
                                CHECK (kind IN ('surface','core')),
    compression_depth        INTEGER NOT NULL DEFAULT 0,
    title                    TEXT,
    chunking_version         TEXT NOT NULL,
    canonicalization_version TEXT NOT NULL,
    schema_version           TEXT NOT NULL,
    ingest_ts                INTEGER NOT NULL,
    hit_count                INTEGER NOT NULL DEFAULT 0,
    last_hit_at              INTEGER
);
CREATE INDEX IF NOT EXISTS idx_documents_uri  ON documents(document_uri);
CREATE INDEX IF NOT EXISTS idx_documents_kind ON documents(kind);

-- Per-document HTTP metadata for cheap recrawl-checks. ETag and
-- Last-Modified come from the response headers at ingest time and let a
-- recrawl pass send conditional HEAD requests (If-None-Match /
-- If-Modified-Since); a 304 lets us skip the body fetch entirely.
-- last_status / last_checked_at record the most recent recheck so
-- operators can audit "when was this URL last verified."
CREATE TABLE IF NOT EXISTS document_http_meta (
    document_root    TEXT PRIMARY KEY,
    etag             TEXT,
    last_modified    TEXT,
    last_fetched_at  INTEGER NOT NULL,
    last_status      INTEGER,
    last_checked_at  INTEGER,
    FOREIGN KEY (document_root) REFERENCES documents(document_root) ON DELETE CASCADE
);

-- Chunks with tier-based reversible eviction.
-- content nullable: cold tier evicts content but retains leaf_hash + URI for
-- rehydration. Identity verified on rehydrate by recomputing leaf_hash.
--
-- chunk_id INTEGER PRIMARY KEY AUTOINCREMENT serves dual duty: it's both the
-- primary key and the rowid that the contentless FTS5 virtual table joins
-- against. The (document_root, idx) UNIQUE constraint preserves the prior
-- "one chunk per (doc, position)" invariant for callers that look up by it.
CREATE TABLE IF NOT EXISTS chunks (
    chunk_id      INTEGER PRIMARY KEY AUTOINCREMENT,
    document_root TEXT NOT NULL,
    idx           INTEGER NOT NULL,
    leaf_hash     TEXT NOT NULL,
    content       TEXT,
    tier          TEXT NOT NULL DEFAULT 'hot'
                     CHECK (tier IN ('hot','warm','cold')),
    UNIQUE (document_root, idx),
    FOREIGN KEY (document_root) REFERENCES documents(document_root) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_chunks_leaf ON chunks(leaf_hash);

-- Interior Merkle nodes (layer >= 1). Layer 0 lives in chunks.leaf_hash.
CREATE TABLE IF NOT EXISTS merkle_nodes (
    document_root TEXT NOT NULL,
    layer         INTEGER NOT NULL,
    idx           INTEGER NOT NULL,
    hash          TEXT NOT NULL,
    PRIMARY KEY (document_root, layer, idx),
    FOREIGN KEY (document_root) REFERENCES documents(document_root) ON DELETE CASCADE
);

-- Cross-links between documents (the forest).
-- Unresolved forward links (dst not yet ingested) carry dst_root='' and the
-- ingest pass backfills dst_root when the target appears.
--
-- WITHOUT ROWID: the PK covers every column, so a default rowid-based table
-- would near-duplicate the row data in the PK index. WITHOUT ROWID makes
-- the table itself a B-tree keyed on the PK and saves ~50% of edge storage
-- on real Wikipedia ingests (measured: 38 MB -> 21 MB / 1000 docs).
-- Behaviorally identical; only the on-disk layout changes.
CREATE TABLE IF NOT EXISTS edges (
    src_root  TEXT NOT NULL,
    dst_root  TEXT NOT NULL DEFAULT '',      -- '' = unresolved, backfilled later
    dst_uri   TEXT NOT NULL DEFAULT '',      -- always present so we can resolve later
    edge_type TEXT NOT NULL,                 -- wikilink, citation, derived_from, ...
    anchor    TEXT NOT NULL DEFAULT '',      -- chunk index or fragment, '' if N/A
    PRIMARY KEY (src_root, edge_type, dst_root, dst_uri, anchor)
) WITHOUT ROWID;
CREATE INDEX IF NOT EXISTS idx_edges_dst_root ON edges(dst_root) WHERE dst_root <> '';
-- No idx_edges_dst_uri: the gravity_top_inbound analytical query in cli.py
-- counts inbound links per resolved destination *document* (dst_root), which
-- this partial index already serves with a streaming GROUP BY. (An earlier
-- formulation grouped by the raw dst_uri link string with no index and
-- hash-aggregated over every target -> unbounded RSS at corpus scale.)

-- Distillation: core_root <- src_root with Merkle-signed proof binding.
CREATE TABLE IF NOT EXISTS derivations (
    core_root    TEXT NOT NULL,
    src_root     TEXT NOT NULL,
    proof_blob   TEXT NOT NULL,              -- JSON merkle proof
    process_id   TEXT NOT NULL,              -- distillation process identifier
    distilled_at INTEGER NOT NULL,
    PRIMARY KEY (core_root, src_root, process_id),
    FOREIGN KEY (core_root) REFERENCES documents(document_root) ON DELETE CASCADE,
    FOREIGN KEY (src_root)  REFERENCES documents(document_root) ON DELETE CASCADE
);

-- v9.8 providence cache: 8-dim admissibility key + falsification state.
-- Schema-only in Phase 0 (no Q&A runs yet); ready for Phase 1.
CREATE TABLE IF NOT EXISTS providence_cache (
    cache_key                TEXT PRIMARY KEY,
    source_root              TEXT NOT NULL,
    document_uri             TEXT NOT NULL,
    question_hash            TEXT NOT NULL,
    question_text            TEXT NOT NULL,
    answer_text              TEXT NOT NULL,
    merkle_proof             TEXT NOT NULL,   -- JSON
    model_profile_hash       TEXT NOT NULL,   -- model_id + revision + quantization
    conversation_hash        TEXT NOT NULL,
    governance_policy_hash   TEXT NOT NULL,
    schema_version           TEXT NOT NULL,
    canonicalization_version TEXT NOT NULL,
    chunking_version         TEXT NOT NULL,
    falsification_state      TEXT NOT NULL DEFAULT 'live'
                                CHECK (falsification_state IN ('live','failed','stale','quarantined')),
    chain                    TEXT NOT NULL DEFAULT 'private'
                                CHECK (chain IN ('private','public')),
    audit_event_hash         TEXT,            -- latest audit event for this record
    created_at               INTEGER NOT NULL,
    last_hit_at              INTEGER,
    hit_count                INTEGER NOT NULL DEFAULT 0,
    -- v9.8 audit_mode trichotomy (RAG-adapted vocabulary; substrate calls
    -- UNGROUNDED "VISUAL"): STRICT (every quote in answer verified against
    -- context), HYBRID (some claims verified, some emergent), UNGROUNDED
    -- (no verbatim grounding — purely emergent from training).
    -- Default UNGROUNDED: an unclassified record is the weakest claim.
    audit_mode               TEXT NOT NULL DEFAULT 'UNGROUNDED'
                                CHECK (audit_mode IN ('STRICT','HYBRID','UNGROUNDED','CANONICAL_PROJECTION')),
    n_quotes                 INTEGER NOT NULL DEFAULT 0,
    n_verified               INTEGER NOT NULL DEFAULT 0,
    -- JSON array of quoted spans the model produced but we couldn't find
    -- verbatim in context. Mining these surfaces "what the model emerged
    -- beyond the corpus" — candidate ingest targets.
    unverified_quotes        TEXT,
    -- Which verifier strategy classified this record. 'quote' = model
    -- followed the format and wrapped claims in double quotes. 'span' =
    -- bullet/sentence-level substring match in context. 'entity' = no
    -- spans matched but multi-word proper nouns did. 'paraphrase' =
    -- token-coverage match (soft signal). 'claim_lattice' = quote-by-
    -- pointer mode (model emitted JSON; verifier checked evidence_id
    -- resolution + source_role + manual-quote prohibition).
    -- 'canonical_projection' = answer produced by a deterministic π*
    -- kernel (arithmetic@v1, logic-kernel@v1, …) — the canonical bytes
    -- ARE the proof. 'none' = no evidence at all (truly emergent).
    verifier_method          TEXT NOT NULL DEFAULT 'none'
                                CHECK (verifier_method IN ('quote','span','entity','paraphrase','claim_lattice','canonical_projection','none')),
    -- Per-run Merkle-DAG. run_dag_root = MerkleTree over ordered stage
    -- hashes (question / retrieval / context / prompt / answer / verify /
    -- final_label). run_dag_blob carries the full {root, nodes} JSON so
    -- an auditor can recompute & verify. Distinct from audit_event_hash
    -- (linear DB-wide chain). NULL on legacy records pre-2026-04-30.
    run_dag_root             TEXT,
    run_dag_blob             TEXT
);
CREATE INDEX IF NOT EXISTS idx_providence_root  ON providence_cache(source_root);
CREATE INDEX IF NOT EXISTS idx_providence_state ON providence_cache(falsification_state);
-- idx_providence_audit lives in _migrate_audit_mode() so legacy shards
-- (where audit_mode column gets added by ALTER TABLE) don't trip this
-- script before the migration runs.

-- Append-only audit chain. event_hash = sha256(prev_event_hash || canonical(body)).
CREATE TABLE IF NOT EXISTS audit_events (
    seq             INTEGER PRIMARY KEY AUTOINCREMENT,
    event_hash      TEXT NOT NULL UNIQUE,
    prev_event_hash TEXT,                    -- NULL for genesis
    event_type      TEXT NOT NULL,           -- ingest|falsify|evict_warm|evict_cold|derive|rehydrate|...
    subject_root    TEXT,                    -- document_root or cache_key
    body            TEXT NOT NULL,           -- canonical JSON
    ts              INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_audit_subject ON audit_events(subject_root);

-- Falsification log: which records were marked failed/stale/quarantined and why.
CREATE TABLE IF NOT EXISTS falsifications (
    cache_key        TEXT NOT NULL,
    state            TEXT NOT NULL,
    reason           TEXT,
    by_actor         TEXT,
    at               INTEGER NOT NULL,
    audit_event_hash TEXT NOT NULL,
    PRIMARY KEY (cache_key, at)
);

-- Snapshots: corpus-level Merkle root pinning a forest state at a point in
-- time. snapshot_root = MerkleTree.build([sorted document_roots]). Audit-
-- chain-linked so peers can verify a claimed snapshot was actually witnessed
-- by this instance. parent_snapshot lets snapshots chain (A -> B -> C) for
-- diff/replay. doc_count is informational; the root is the canonical id.
CREATE TABLE IF NOT EXISTS snapshots (
    snapshot_root      TEXT PRIMARY KEY,
    taken_at           INTEGER NOT NULL,
    audit_event_hash   TEXT NOT NULL,
    doc_count          INTEGER NOT NULL,
    parent_snapshot    TEXT,
    reason             TEXT
);
CREATE INDEX IF NOT EXISTS idx_snapshots_taken_at ON snapshots(taken_at);

-- Mesh layer tables. Off by default — populated only when the user runs
-- `arborist mesh init`. Never accessed by ingest / query / distill paths;
-- mesh state is opt-in plumbing for federated peers (see arborist.mesh).
CREATE TABLE IF NOT EXISTS mesh_identity (
    id           INTEGER PRIMARY KEY CHECK (id = 1),  -- singleton
    member_id    TEXT NOT NULL UNIQUE,
    sign_priv    BLOB NOT NULL,                       -- ed25519 32B raw
    sign_pub     BLOB NOT NULL,                       -- ed25519 32B raw
    dh_priv      BLOB NOT NULL,                       -- x25519 32B raw
    dh_pub       BLOB NOT NULL,                       -- x25519 32B raw
    group_name   TEXT NOT NULL,
    created_at   INTEGER NOT NULL
);

-- Per-epoch roster. epoch 0 = group genesis (founder only). Each membership
-- mutation (join, kick, scheduled rotate) bumps the epoch_id by 1 and writes
-- a fresh row-set capturing the new roster.
CREATE TABLE IF NOT EXISTS mesh_roster (
    epoch_id     INTEGER NOT NULL,
    member_id    TEXT NOT NULL,
    sign_pub     BLOB NOT NULL,
    dh_pub       BLOB NOT NULL,
    role         TEXT NOT NULL DEFAULT 'member'
                    CHECK (role IN ('admin','member')),
    PRIMARY KEY (epoch_id, member_id)
);
CREATE INDEX IF NOT EXISTS idx_mesh_roster_member ON mesh_roster(member_id);

-- Epoch lifecycle log. secret_envelope is JSON of the form
--    {"member_id": {"nonce_b64": "...", "ct_b64": "..."}, ...}
-- where each entry is the symmetric epoch secret AEAD-wrapped to that
-- member's X25519 pubkey via ECDH. Eviction happens by NOT including the
-- evicted member's entry in the next epoch's envelope.
CREATE TABLE IF NOT EXISTS mesh_epochs (
    epoch_id           INTEGER PRIMARY KEY,
    started_at         INTEGER NOT NULL,
    started_event_hash TEXT NOT NULL,
    secret_envelope    TEXT NOT NULL,
    reason             TEXT
);

-- Per-peer audit-chain tracking. Each row records the most recent
-- event_hash a given peer has broadcast to us; we enforce that every
-- subsequent gossip envelope carries `prev_event_hash == last_event_hash`
-- of that peer. A mismatch is a fork — the gossip is rejected (409).
-- last_seq is the local count of accepted envelopes from that peer
-- (informational; the canonical chain identity is last_event_hash).
CREATE TABLE IF NOT EXISTS mesh_peer_chains (
    peer_member_id    TEXT PRIMARY KEY,
    last_event_hash   TEXT NOT NULL,
    last_seq          INTEGER NOT NULL,
    last_seen_at      INTEGER NOT NULL
);

-- FTS5 over chunk content for UNGROUNDED-mode keyword search.
--
-- Contentless mode (`content=''`): FTS5 stores ONLY the inverted index, no
-- copy of the indexed text. This eliminates the ~28 MB / 1000 docs that the
-- prior schema spent on chunks_fts_content (the stored copy was redundant
-- with chunks.content). The trade: snippet() / highlight() return empty
-- in contentless mode, so the FTS5 backend builds snippets in Python by
-- joining `chunks_fts.rowid = chunks.chunk_id`, decompressing chunks.content,
-- and locating query tokens.
--
-- Inserts use `INSERT INTO chunks_fts (rowid, content) VALUES (chunk_id, plain)`
-- — the rowid must equal the chunks.chunk_id of the underlying row so the
-- search-time join lines up.
CREATE VIRTUAL TABLE IF NOT EXISTS chunks_fts USING fts5(
    content,
    content='',
    contentless_delete=1,
    tokenize = 'porter unicode61'
);

-- Document-title FTS5 index. Replaces the un-indexable
-- `LOWER(title) LIKE '%tok%'` title-LIKE search with O(K) hash
-- lookup. Pre-2026-05-02 the title-LIKE backup was either skipped
-- (>5 tokens) or paid ~10s/shard for short queries. The MATCH-based
-- replacement runs in ~0.05s/shard regardless of token count, which
-- means we can re-enable synonym-expanded title search for long
-- queries without paying the corpus-scan cost.
--
-- Contentless mode: same trick as chunks_fts — store only the
-- inverted index, not a copy of the title. The rowid joins back to
-- documents.rowid (sqlite's hidden integer rowid is fine for a
-- 1-1 mapping). On re-ingest, the FTS5 row gets replaced via the
-- ingest path's INSERT OR REPLACE INTO documents flow.
CREATE VIRTUAL TABLE IF NOT EXISTS documents_fts USING fts5(
    title,
    content='',
    contentless_delete=1,
    tokenize = 'porter unicode61'
);

-- Concept-relations layer. Append-only secondary index over the corpus.
-- Each row is a (token, target) edge of a given relation_kind, derived
-- from a specific source document by a specific extractor (evidence_kind).
-- Re-derivation is idempotent at the (source_root, relation_kind, token,
-- target, evidence_kind) level via UNIQUE.
--
-- This table is SEPARATE from the Merkle layer: writes here NEVER affect
-- document_root / chunk_root / cache_key. So the corpus's whole Merkle
-- tree stays valid across re-derivations; we can backfill or re-extract
-- concept relations without invalidating any cached answers.
--
-- Cross-shard lookup. Concept relations live in the shard whose document
-- they were derived from; the lookup helpers in arborist.concepts walk all
-- shards (same pattern as cross-shard FTS5 search). Mesh sync moves shards
-- between peers; concept relations come along for the ride automatically.
--
-- relation_kind:
--   'synonym'  - token & target retrieve interchangeably (See-also
--                bidirectional, redirect target, internal-link cluster)
--   'antonym'  - token & target are explicit opposites (manual / hatnote
--                "not to be confused with")
--   'rivalry'  - token & target compete in a category (same-category
--                membership without cross-link; manual rivalries)
--   'category' - token belongs to category target (Wikipedia
--                [[Category:X]] tail; HTML schema.org/<meta> classification)
--
-- evidence_kind: which extractor produced the row. Lets `arborist concepts
-- purge --evidence-kind X` revoke a single extractor's output cleanly
-- without touching manual or other-extractor rows. New extractors register
-- a stable evidence_kind string; legacy seeds are 'manual_legacy'.
CREATE TABLE IF NOT EXISTS concept_relations (
    id              INTEGER PRIMARY KEY AUTOINCREMENT,
    source_root     TEXT NOT NULL,
    relation_kind   TEXT NOT NULL
                        CHECK (relation_kind IN ('synonym','antonym','rivalry','category')),
    token           TEXT NOT NULL,
    target          TEXT NOT NULL,
    evidence_kind   TEXT NOT NULL,
    confidence      REAL NOT NULL DEFAULT 1.0,
    derived_at      INTEGER NOT NULL,
    derived_from    TEXT,                                  -- shard/uri/extractor identifier
    UNIQUE (source_root, relation_kind, token, target, evidence_kind)
);
CREATE INDEX IF NOT EXISTS idx_concept_token  ON concept_relations(token);
CREATE INDEX IF NOT EXISTS idx_concept_target ON concept_relations(target);
CREATE INDEX IF NOT EXISTS idx_concept_kind   ON concept_relations(relation_kind);
CREATE INDEX IF NOT EXISTS idx_concept_evid   ON concept_relations(evidence_kind);

-- Per-token corpus document-frequency (for IDF ranking at synonym
-- expansion cap-time). Computed once at backfill via fts5vocab over
-- chunks_fts. Only tokens that appear in concept_relations get a row;
-- the synonym layer is the consumer & it ranks expansion by 1/log(doc_freq)
-- when the cap saturates so common-corpus words drop before rare topical
-- ones.
--
-- doc_freq is FTS5-chunk-level (number of chunks containing the term);
-- adequate proxy for true doc-level since chunks are sized 512 tokens
-- and a doc rarely has the same term in only one chunk. Cross-shard
-- ranking sums doc_freq across all shards' rows.
CREATE TABLE IF NOT EXISTS concept_token_idf (
    token       TEXT PRIMARY KEY,
    doc_freq    INTEGER NOT NULL,
    total_docs  INTEGER NOT NULL,
    derived_at  INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_token_idf_freq ON concept_token_idf(doc_freq);

-- Adapter LossReport sidecar (ticket #000022). Per-chunk record of what
-- bytes the adapter / canonicalizer dropped, transformed, or normalized
-- on the way from raw bytes to the prose the model sees.
--
-- DESIGN: SIDECAR ONLY. This table never enters cache_key, document_root,
-- run_dag_root, or audit_events.event_hash preimage. Its policy lives in
-- ``loss_report_policy_hash`` (column below) — separate from the QA
-- ``governance_policy_hash`` so toggling reporting does NOT invalidate
-- prior cache entries. See ticket §3.7.
--
-- ``loss_mode`` semantics (closed enum):
--   'pure_drop'   content removed entirely (e.g. <ref> blocks).
--                 sum(bytes_dropped) MUST be <= input_length_bytes.
--   'transform'   content rewritten to different bytes (e.g.
--                 [[Velociraptor]] -> Velociraptor). bytes_dropped
--                 records a delta, not a deletion. Excluded from
--                 the byte-conservation property test.
--   'quarantine'  content preserved elsewhere but withheld from prose.
--                 Reserved; no v0 emitter.
--   'normalize'   semantically low-content formatting normalization
--                 (whitespace runs, newline collapse, &nbsp; runs).
--                 Excluded from byte-conservation accounting.
--
-- ``loss_kind`` is free-string in v0 (per ticket §4 out-of-scope) so
-- adapter authors can mint new kinds. Promotion to enum after >=3
-- adapters are in tree.
--
-- Document-scope losses (e.g. HTML _normalize_text drops that happen
-- before chunking) are anchored to chunk_id of the lowest-idx chunk
-- of the document by convention.
CREATE TABLE IF NOT EXISTS adapter_loss_reports (
    chunk_id                 INTEGER NOT NULL,
    document_root            TEXT NOT NULL,
    stage                    TEXT NOT NULL,
    canonicalization_version TEXT NOT NULL,
    loss_kind                TEXT NOT NULL,
    loss_mode                TEXT NOT NULL
                                CHECK (loss_mode IN ('pure_drop','transform','quarantine','normalize')),
    bytes_dropped            INTEGER NOT NULL,
    occurrence_count         INTEGER NOT NULL,
    input_length_bytes       INTEGER,
    output_length_bytes      INTEGER,
    sample_excerpt           TEXT,
    sample_hash              TEXT,
    adapter_name             TEXT,
    adapter_version          TEXT,
    loss_report_policy_hash  TEXT NOT NULL,
    created_at               INTEGER NOT NULL,
    PRIMARY KEY (chunk_id, stage, canonicalization_version, loss_kind)
);
CREATE INDEX IF NOT EXISTS idx_adapter_loss_doc
    ON adapter_loss_reports(document_root, stage);
CREATE INDEX IF NOT EXISTS idx_adapter_loss_kind
    ON adapter_loss_reports(loss_kind, stage);
CREATE INDEX IF NOT EXISTS idx_adapter_loss_chunk
    ON adapter_loss_reports(chunk_id, stage);

-- Citation aliases (#000041): map a claim-pack record's original
-- source_reference string to a substitute citation when the original
-- text isn't ingestable (proprietary / unavailable) but a comparable
-- PD work is. Read at warrant-resolve time when --use-aliases is
-- passed; alias-resolved derivations carry process_id
-- "warrant-resolver-v1+alias" so the audit trail tells substituted
-- chains from original ones.
--
-- Audit discipline: decision_at + decision_by + decision_rationale
-- are NOT NULL. The CLI refuses to add rows without those fields.
-- "Honest substitute, not silent fabrication."
CREATE TABLE IF NOT EXISTS citation_aliases (
    original_ref       TEXT NOT NULL,
    substitute_ref     TEXT NOT NULL,
    substitute_authors TEXT NOT NULL,        -- JSON array of strings
    substitute_title   TEXT NOT NULL,
    decision_at        INTEGER NOT NULL,
    decision_by        TEXT NOT NULL,
    decision_rationale TEXT,
    PRIMARY KEY (original_ref, substitute_ref)
);
CREATE INDEX IF NOT EXISTS idx_citation_alias_orig
    ON citation_aliases(original_ref);

-- Term aliases (#000042): bridge vocabulary mismatches between a
-- claim-pack record's modern term and a textbook's historical /
-- foreign-language / pre-modern term for the same concept. E.g.,
-- modern "incidence" ↔ Hilbert-1902-translation "connection" in
-- the geometry domain. Read at warrant-resolve time when
-- --use-aliases is passed; query expansion replaces a token with
-- (token OR alt) so FTS5 matches either vocabulary.
--
-- Same audit discipline as citation_aliases.
CREATE TABLE IF NOT EXISTS term_aliases (
    term               TEXT NOT NULL,
    alternate_term     TEXT NOT NULL,
    domain             TEXT NOT NULL,        -- e.g., "geometry", "logic"
    decision_at        INTEGER NOT NULL,
    decision_by        TEXT NOT NULL,
    decision_rationale TEXT,
    PRIMARY KEY (term, alternate_term, domain)
);
CREATE INDEX IF NOT EXISTS idx_term_alias_term_domain
    ON term_aliases(term, domain);
"""


# Process-local migration memoization (#000026 Phase 1). Keyed by
# resolved path. Migrations are forward-only and idempotent within
# a code version, so once we've run them on a path in this process,
# we skip on every subsequent `connect()`. Pre-fix: 588 `connect()`
# calls per typical query, each running 7 migration probes. Post-
# fix: 1 probe sequence per (path, process), then skipped.
#
# Tradeoff: this assumes a path identifies a single SQLite file for
# the lifetime of the process. If a caller replaces the file at the
# same path (e.g. a test deletes + recreates), they MUST call
# `invalidate_migration_cache(path)` (or `_clear_migration_cache()`).
# We don't try to detect replacement automatically — (dev, inode)
# fails under tmpfs inode reuse, and (mtime, size) drift naturally
# as SQLite operates on the file (WAL checkpoints, page growth).
# Path-only with explicit invalidation is honest about the contract.
_MIGRATED_SHARDS: set[str] = set()


def _clear_migration_cache() -> None:
    """Drop all memoized migration claims. Mostly for tests, but
    also for callers that knowingly replace a shard file at the same
    path (e.g. snapshot-restore flows)."""
    _MIGRATED_SHARDS.clear()


[docs] def invalidate_migration_cache(db_path: Path | str) -> None: """Drop the memoized migration claim for one path. Call this after replacing the underlying file; the next `connect()` will re-run the full migration probe sequence.""" _MIGRATED_SHARDS.discard(str(Path(db_path).resolve()))
[docs] def connect(db_path: Path | str = DEFAULT_DB_PATH) -> sqlite3.Connection: """Open a writable connection, creating the parent dir + schema if needed. Performance pragmas applied per-connection. Under WAL (set in the schema): - synchronous=NORMAL skips the per-commit fsync; durable up to the last checkpoint (SQLite auto-checkpoints at WAL ~1000 frames). - cache_size=-65536 = 64 MB page cache (reduces re-reads). - temp_store=MEMORY keeps temp tables in RAM (no /tmp churn). - mmap_size=256 MB lets reads come from page-cache without read() syscalls. Migration probes (executescript(SCHEMA_SQL) + the forward migrations) run once per (physical file, process). Subsequent ``connect()`` calls on the same shard skip migration entirely — see #000026 Phase 1. ``busy_timeout`` is set on every connection (before the migration pass, so it covers that too): a peer mid-write — migration DDL, a ``transaction()`` block, ``append_audit``'s own ``BEGIN IMMEDIATE`` — makes us *wait* rather than fail fast with ``database is locked``. Without it, concurrent appenders that fail-and-retry can re-read a stale chain head and fork the audit chain (qa.db seq 7724/7725 was that bug); waiting + the ``BEGIN IMMEDIATE`` serialization fixes it. """ p = Path(db_path) p.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(p, isolation_level=None) # autocommit; we'll BEGIN manually conn.row_factory = sqlite3.Row conn.execute("PRAGMA busy_timeout = 5000") # wait on a peer's write lock, don't fail-fast cache_key = str(p.resolve()) if cache_key not in _MIGRATED_SHARDS: conn.executescript(SCHEMA_SQL) _migrate_audit_mode(conn) _migrate_mesh_peer_chains(conn) _migrate_document_http_meta(conn) _migrate_selfmodel_tables(conn) _migrate_capital_ledger(conn) _migrate_memory_root(conn) _migrate_adapter_loss_reports(conn) _migrate_controller_events(conn) _migrate_fork_score_branches(conn) _MIGRATED_SHARDS.add(cache_key) # Per-connection state — must run on EVERY open. SQLite scopes # `foreign_keys` per-connection (not per-DB), so the FK-CASCADE # behavior our schema relies on requires this PRAGMA every time # we open a connection. The other settings are tuning flags that # also live per-connection. conn.execute("PRAGMA foreign_keys = ON") conn.execute("PRAGMA synchronous = NORMAL") conn.execute("PRAGMA cache_size = -65536") conn.execute("PRAGMA temp_store = MEMORY") conn.execute("PRAGMA mmap_size = 268435456") return conn
def _add_column_if_missing( conn: sqlite3.Connection, existing: set[str], column: str, ddl: str ) -> None: """Run an ``ALTER TABLE … ADD COLUMN`` idempotently. SQLite has no ``ADD COLUMN IF NOT EXISTS``. The PRAGMA-derived ``existing`` column set is the common-case fast-path skip; the ``duplicate column name`` catch swallows the TOCTOU loser when two ``connect()`` calls race a fresh shard (both probe, both ADD — the second would otherwise raise). Sibling DDL applied by the same codebase, so the column definition the winner installed is identical to ours. """ if column in existing: return try: conn.execute(ddl) except sqlite3.OperationalError as exc: if "duplicate column name" not in str(exc).lower(): raise def _migrate_audit_mode(conn: sqlite3.Connection) -> None: """Forward-migrate pre-v9.8-audit-mode providence_cache shards. Adds audit_mode + n_quotes + n_verified + unverified_quotes + verifier_method columns to DBs that pre-date the faithfulness-classification rollout. SQLite ALTER TABLE ADD COLUMN is O(1) (metadata-only) so this is cheap on every open. Idempotent under repeat-open (PRAGMA fast-path) and under concurrent ``connect()`` (the duplicate-column catch in ``_add_column_if_missing``). Also handles the VISUAL → UNGROUNDED rename for the audit_mode value space. SQLite cannot ALTER a column's CHECK in place, so legacy tables with the old `CHECK (audit_mode IN ('STRICT','HYBRID','VISUAL'))` get rebuilt via the standard temp-table dance, with values translated. Each ``_rebuild_providence_cache_*`` self-wraps in ``BEGIN IMMEDIATE`` so a failure rolls back clean; under a concurrent ``connect()`` the loser waits on that write txn (``busy_timeout`` is raised for the migration pass — see ``connect()``) and then re-runs its probes harmlessly. """ cols = {row["name"] for row in conn.execute("PRAGMA table_info(providence_cache)")} _add_column_if_missing( conn, cols, "audit_mode", "ALTER TABLE providence_cache ADD COLUMN audit_mode TEXT " "NOT NULL DEFAULT 'UNGROUNDED' " "CHECK (audit_mode IN ('STRICT','HYBRID','UNGROUNDED'))", ) _add_column_if_missing( conn, cols, "n_quotes", "ALTER TABLE providence_cache ADD COLUMN n_quotes INTEGER NOT NULL DEFAULT 0", ) _add_column_if_missing( conn, cols, "n_verified", "ALTER TABLE providence_cache ADD COLUMN n_verified INTEGER NOT NULL DEFAULT 0", ) _add_column_if_missing( conn, cols, "unverified_quotes", "ALTER TABLE providence_cache ADD COLUMN unverified_quotes TEXT", ) _add_column_if_missing( conn, cols, "verifier_method", "ALTER TABLE providence_cache ADD COLUMN verifier_method TEXT " "NOT NULL DEFAULT 'none' " "CHECK (verifier_method IN ('quote','span','entity','paraphrase','none'))", ) _add_column_if_missing( conn, cols, "run_dag_root", "ALTER TABLE providence_cache ADD COLUMN run_dag_root TEXT", ) _add_column_if_missing( conn, cols, "run_dag_blob", "ALTER TABLE providence_cache ADD COLUMN run_dag_blob TEXT", ) # VISUAL → UNGROUNDED rename. Detect legacy CHECK by inspecting DDL. ddl_row = conn.execute( "SELECT sql FROM sqlite_master WHERE type='table' AND name='providence_cache'" ).fetchone() if ddl_row and "'VISUAL'" in (ddl_row[0] or ""): _rebuild_providence_cache_ungrounded(conn) # paraphrase verifier_method addition. Detect legacy CHECK by # inspecting DDL — if the constraint doesn't already list # 'paraphrase', rebuild the table. ddl_row = conn.execute( "SELECT sql FROM sqlite_master WHERE type='table' AND name='providence_cache'" ).fetchone() if ddl_row and "'paraphrase'" not in (ddl_row[0] or ""): _rebuild_providence_cache_paraphrase(conn) # claim_lattice verifier_method addition (G0 — quote-by-pointer mode). ddl_row = conn.execute( "SELECT sql FROM sqlite_master WHERE type='table' AND name='providence_cache'" ).fetchone() if ddl_row and "'claim_lattice'" not in (ddl_row[0] or ""): _rebuild_providence_cache_claim_lattice(conn) # canonical_projection (#000027) — extends BOTH audit_mode and # verifier_method CHECK constraints. CANONICAL_PROJECTION is the # admissibility class for deterministic π* kernel answers # (arithmetic@v1, logic-kernel@v1, …); 'canonical_projection' is the # verifier-method token. Detect either missing → rebuild once. ddl_row = conn.execute( "SELECT sql FROM sqlite_master WHERE type='table' AND name='providence_cache'" ).fetchone() if ddl_row and ( "'CANONICAL_PROJECTION'" not in (ddl_row[0] or "") or "'canonical_projection'" not in (ddl_row[0] or "") ): _rebuild_providence_cache_canonical_projection(conn) conn.execute( "CREATE INDEX IF NOT EXISTS idx_providence_audit " "ON providence_cache(audit_mode)" ) def _migrate_document_http_meta(conn: sqlite3.Connection) -> None: """Forward-migrate pre-recrawl-check shards. Adds ``document_http_meta`` to DBs that pre-date the recrawl-check feature. Mirrors ``_migrate_mesh_peer_chains``: idempotent table-existence probe, then CREATE if missing. """ row = conn.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name='document_http_meta'" ).fetchone() if row is None: conn.execute( "CREATE TABLE IF NOT EXISTS document_http_meta (" " document_root TEXT PRIMARY KEY," " etag TEXT," " last_modified TEXT," " last_fetched_at INTEGER NOT NULL," " last_status INTEGER," " last_checked_at INTEGER," " FOREIGN KEY (document_root) REFERENCES documents(document_root)" " ON DELETE CASCADE" ")" ) def _migrate_selfmodel_tables(conn: sqlite3.Connection) -> None: """Forward-migrate to add SelfModel tables (ticket #000014). Adds ``selfmodel_records`` + ``selfmodel_capability_claims`` to DBs that pre-date SelfModel landing. Idempotent table-existence probe then CREATE-if-missing, matching the existing migration pattern. SelfModel rows are advisory by default — they do not enter cache_key unless ``governance_policy.selfmodel_binding`` is flipped on (per ticket #000014 §2.4). """ row = conn.execute( "SELECT name FROM sqlite_master " "WHERE type='table' AND name='selfmodel_records'" ).fetchone() if row is None: conn.execute( "CREATE TABLE IF NOT EXISTS selfmodel_records (" " selfmodel_root TEXT PRIMARY KEY," " schema_version TEXT NOT NULL," " parent_selfmodel_root TEXT," " model_profile_hash TEXT NOT NULL," " verifier_method_root TEXT NOT NULL," " governance_policy_hash TEXT NOT NULL," " canonicalization_version TEXT NOT NULL," " chunking_version TEXT NOT NULL," " accepted_patch_root TEXT," " rejected_patch_root TEXT," " memory_root TEXT," " state TEXT NOT NULL DEFAULT 'live'" " CHECK (state IN ('live','stale','falsified'))," " body_blob BLOB NOT NULL," " audit_event_hash TEXT NOT NULL," " created_at INTEGER NOT NULL," " falsified_at INTEGER," " falsified_reason TEXT" ")" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_selfmodel_state ON selfmodel_records(state)" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_selfmodel_parent " "ON selfmodel_records(parent_selfmodel_root)" ) row = conn.execute( "SELECT name FROM sqlite_master " "WHERE type='table' AND name='selfmodel_capability_claims'" ).fetchone() if row is None: conn.execute( "CREATE TABLE IF NOT EXISTS selfmodel_capability_claims (" " claim_hash TEXT PRIMARY KEY," " selfmodel_root TEXT NOT NULL," " metric TEXT NOT NULL," " threshold REAL NOT NULL," " eval_digest TEXT NOT NULL," " measured_value REAL," " measured_at INTEGER," " validity_horizon TEXT," " body_blob BLOB NOT NULL," " FOREIGN KEY (selfmodel_root) REFERENCES selfmodel_records(selfmodel_root)" ")" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_claim_metric " "ON selfmodel_capability_claims(metric)" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_claim_selfmodel " "ON selfmodel_capability_claims(selfmodel_root)" ) def _migrate_capital_ledger(conn: sqlite3.Connection) -> None: """Forward-migrate to add capital-cost ledger (ticket #000020). Adds ``capital_ledger`` to DBs that pre-date the 8-capital-form cost attribution layer. Sibling table — does NOT enter audit_events.event_hash preimage. Op authors pass an optional CapitalProfile to ``append_audit`` and we record an attached ledger row; no profile = no row, fully backward-compatible. """ row = conn.execute( "SELECT name FROM sqlite_master " "WHERE type='table' AND name='capital_ledger'" ).fetchone() if row is None: conn.execute( "CREATE TABLE IF NOT EXISTS capital_ledger (" " ledger_id INTEGER PRIMARY KEY AUTOINCREMENT," " audit_event_hash TEXT NOT NULL," " op_type TEXT NOT NULL," " living REAL NOT NULL DEFAULT 0," " material REAL NOT NULL DEFAULT 0," " financial REAL NOT NULL DEFAULT 0," " intellectual REAL NOT NULL DEFAULT 0," " experiential REAL NOT NULL DEFAULT 0," " social REAL NOT NULL DEFAULT 0," " cultural REAL NOT NULL DEFAULT 0," " spiritual REAL NOT NULL DEFAULT 0," " estimator_version TEXT NOT NULL," " estimator_inputs_blob TEXT," " recorded_at INTEGER NOT NULL" ")" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_capital_ledger_audit " "ON capital_ledger(audit_event_hash)" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_capital_ledger_op " "ON capital_ledger(op_type)" ) def _migrate_controller_events(conn: sqlite3.Connection) -> None: """Forward-migrate to add controller_events (ticket #000037 Phase 2). Adds ``controller_events`` to DBs that pre-date the Prometheus-Sigma advisory-write layer. Sibling table — does NOT enter audit_events.event_hash preimage. Operators can query / aggregate / prune controller_events without integrity risk. Three event kinds populate this table: - ``controller_decision`` — one per ControllerDecision - ``controller_difficulty`` — one per ControllerDecision (records difficulty_next) - ``controller_budget_allocation`` — one per nonzero allocation branch in the decision Idempotent via UNIQUE (event_kind, body_hash). body_hash is the sha256 of canonical-JSON-encoded body. """ row = conn.execute( "SELECT name FROM sqlite_master " "WHERE type='table' AND name='controller_events'" ).fetchone() if row is None: conn.execute( "CREATE TABLE IF NOT EXISTS controller_events (" " event_id INTEGER PRIMARY KEY AUTOINCREMENT," " organism_root TEXT NOT NULL," " branch_id TEXT," " event_kind TEXT NOT NULL," " label TEXT," " entropy REAL," " difficulty REAL," " allocation REAL," " body_blob TEXT NOT NULL," " body_hash TEXT NOT NULL," " recorded_at INTEGER NOT NULL," " UNIQUE (event_kind, body_hash)" ")" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_controller_events_organism " "ON controller_events(organism_root)" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_controller_events_kind " "ON controller_events(event_kind)" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_controller_events_at " "ON controller_events(recorded_at)" ) def _migrate_fork_score_branches(conn: sqlite3.Connection) -> None: """Forward-migrate to add fork_score_branches (ticket #000012 Phase 1c). Adds the sibling table that lets ``arborist substrate score`` persist multi-branch checkpoints. Sibling — does NOT enter ``audit_events.event_hash`` preimage, so re-scoring or back- filling cannot break the audit chain. PK ``(branch_set_id, branch_id)`` so re-scoring the same fork under the same set is a clean upsert, not a duplicate row. Default-off at the CLI surface (``--branch-set`` flag absent ⇒ no row written). Feeds #000037 §12 Trigger 1 — once a checkpoint accumulates ≥4 branches in this table, the multi-branch path of the Prometheus-Σ controller has empirical data to fire on. """ row = conn.execute( "SELECT name FROM sqlite_master " "WHERE type='table' AND name='fork_score_branches'" ).fetchone() if row is None: conn.execute( "CREATE TABLE IF NOT EXISTS fork_score_branches (" " branch_set_id TEXT NOT NULL," " branch_id TEXT NOT NULL," " parent_root TEXT NOT NULL," " child_root TEXT," " score REAL NOT NULL," " verdict TEXT NOT NULL," " breakdown_blob TEXT NOT NULL," " weights_id TEXT NOT NULL," " estimator_version TEXT NOT NULL," " recorded_at INTEGER NOT NULL," " PRIMARY KEY (branch_set_id, branch_id)" ")" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_fork_score_branches_set " "ON fork_score_branches(branch_set_id)" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_fork_score_branches_parent " "ON fork_score_branches(parent_root)" ) def _migrate_memory_root(conn: sqlite3.Connection) -> None: """Forward-migrate to add memory-root tables (ticket #000017). Adds ``memory_records`` + ``memory_branch_summaries`` to DBs that pre-date the lifelong-learning audit summary layer. Memory snapshots are advisory by default — they do not enter cache_key unless ``governance_policy.memory_binding`` is flipped on. See ticket #000017 for branch-projection semantics. """ row = conn.execute( "SELECT name FROM sqlite_master " "WHERE type='table' AND name='memory_records'" ).fetchone() if row is None: conn.execute( "CREATE TABLE IF NOT EXISTS memory_records (" " memory_root TEXT PRIMARY KEY," " schema_version TEXT NOT NULL," " parent_memory_root TEXT," " audit_events_high_water TEXT NOT NULL," " branch_summaries_blob BLOB NOT NULL," " state TEXT NOT NULL DEFAULT 'live'" " CHECK (state IN ('live','stale','falsified'))," " audit_event_hash TEXT NOT NULL," " created_at INTEGER NOT NULL," " falsified_at INTEGER," " falsified_reason TEXT" ")" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_memory_state ON memory_records(state)" ) row = conn.execute( "SELECT name FROM sqlite_master " "WHERE type='table' AND name='memory_branch_summaries'" ).fetchone() if row is None: conn.execute( "CREATE TABLE IF NOT EXISTS memory_branch_summaries (" " branch_id TEXT NOT NULL," " memory_root TEXT NOT NULL," " summary_digest TEXT NOT NULL," " summary_blob BLOB NOT NULL," " count INTEGER NOT NULL," " PRIMARY KEY (branch_id, memory_root)," " FOREIGN KEY (memory_root) REFERENCES memory_records(memory_root)" ")" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_branch_memory " "ON memory_branch_summaries(memory_root)" ) def _migrate_adapter_loss_reports(conn: sqlite3.Connection) -> None: """Forward-migrate to add adapter_loss_reports (ticket #000022). Sidecar — does NOT enter cache_key, document_root, run_dag_root, audit_events.event_hash preimage. Idempotent table-existence probe, then CREATE-if-missing, mirroring the established migration pattern. """ row = conn.execute( "SELECT name FROM sqlite_master " "WHERE type='table' AND name='adapter_loss_reports'" ).fetchone() if row is None: conn.execute( "CREATE TABLE IF NOT EXISTS adapter_loss_reports (" " chunk_id INTEGER NOT NULL," " document_root TEXT NOT NULL," " stage TEXT NOT NULL," " canonicalization_version TEXT NOT NULL," " loss_kind TEXT NOT NULL," " loss_mode TEXT NOT NULL" " CHECK (loss_mode IN ('pure_drop','transform','quarantine','normalize'))," " bytes_dropped INTEGER NOT NULL," " occurrence_count INTEGER NOT NULL," " input_length_bytes INTEGER," " output_length_bytes INTEGER," " sample_excerpt TEXT," " sample_hash TEXT," " adapter_name TEXT," " adapter_version TEXT," " loss_report_policy_hash TEXT NOT NULL," " created_at INTEGER NOT NULL," " PRIMARY KEY (chunk_id, stage, canonicalization_version, loss_kind)" ")" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_adapter_loss_doc " "ON adapter_loss_reports(document_root, stage)" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_adapter_loss_kind " "ON adapter_loss_reports(loss_kind, stage)" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_adapter_loss_chunk " "ON adapter_loss_reports(chunk_id, stage)" ) def _migrate_mesh_peer_chains(conn: sqlite3.Connection) -> None: """Forward-migrate pre-mesh-fork-detection shards. Adds the `mesh_peer_chains` table to DBs that pre-date per-peer audit-chain tracking on the mesh wire. CREATE TABLE IF NOT EXISTS in SCHEMA_SQL covers brand-new shards; this migration is a belt- and-suspenders idempotency check for callers that bypass the full SCHEMA_SQL pass (cross-shard query views, etc.). Idempotent — PRAGMA-checks before issuing CREATE. """ row = conn.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name='mesh_peer_chains'" ).fetchone() if row is None: conn.execute( "CREATE TABLE IF NOT EXISTS mesh_peer_chains (" " peer_member_id TEXT PRIMARY KEY," " last_event_hash TEXT NOT NULL," " last_seq INTEGER NOT NULL," " last_seen_at INTEGER NOT NULL" ")" ) def _rebuild_providence_cache_paraphrase(conn: sqlite3.Connection) -> None: """One-time table rebuild: expand verifier_method CHECK to include 'paraphrase' (4th verifier strategy: token-coverage paraphrase matching, alongside quote/span/entity). Same pattern as ``_rebuild_providence_cache_ungrounded``: SQLite can't modify a CHECK constraint in place, so we create a new table with the expanded CHECK, copy the data verbatim (no value translation needed — paraphrase is additive), drop the old, rename the new. Caller probes existing CHECK from sqlite_master and only invokes this when 'paraphrase' is missing. """ new_create = """ CREATE TABLE providence_cache_new ( cache_key TEXT PRIMARY KEY, source_root TEXT NOT NULL, document_uri TEXT NOT NULL, question_hash TEXT NOT NULL, question_text TEXT NOT NULL, answer_text TEXT NOT NULL, merkle_proof TEXT NOT NULL, model_profile_hash TEXT NOT NULL, conversation_hash TEXT NOT NULL, governance_policy_hash TEXT NOT NULL, schema_version TEXT NOT NULL, canonicalization_version TEXT NOT NULL, chunking_version TEXT NOT NULL, falsification_state TEXT NOT NULL DEFAULT 'live' CHECK (falsification_state IN ('live','failed','stale','quarantined')), chain TEXT NOT NULL DEFAULT 'private' CHECK (chain IN ('private','public')), audit_event_hash TEXT, created_at INTEGER NOT NULL, last_hit_at INTEGER, hit_count INTEGER NOT NULL DEFAULT 0, audit_mode TEXT NOT NULL DEFAULT 'UNGROUNDED' CHECK (audit_mode IN ('STRICT','HYBRID','UNGROUNDED')), n_quotes INTEGER NOT NULL DEFAULT 0, n_verified INTEGER NOT NULL DEFAULT 0, unverified_quotes TEXT, verifier_method TEXT NOT NULL DEFAULT 'none' CHECK (verifier_method IN ('quote','span','entity','paraphrase','none')), run_dag_root TEXT, run_dag_blob TEXT ) """ conn.execute("BEGIN IMMEDIATE") try: conn.execute(new_create) conn.execute( "INSERT INTO providence_cache_new " "(cache_key, source_root, document_uri, question_hash, question_text, " " answer_text, merkle_proof, model_profile_hash, conversation_hash, " " governance_policy_hash, schema_version, canonicalization_version, " " chunking_version, falsification_state, chain, audit_event_hash, " " created_at, last_hit_at, hit_count, audit_mode, n_quotes, " " n_verified, unverified_quotes, verifier_method) " "SELECT " " cache_key, source_root, document_uri, question_hash, question_text, " " answer_text, merkle_proof, model_profile_hash, conversation_hash, " " governance_policy_hash, schema_version, canonicalization_version, " " chunking_version, falsification_state, chain, audit_event_hash, " " created_at, last_hit_at, hit_count, audit_mode, n_quotes, " " n_verified, unverified_quotes, verifier_method " "FROM providence_cache" ) conn.execute("DROP TABLE providence_cache") conn.execute("ALTER TABLE providence_cache_new RENAME TO providence_cache") conn.execute("COMMIT") except Exception: conn.execute("ROLLBACK") raise def _rebuild_providence_cache_claim_lattice(conn: sqlite3.Connection) -> None: """One-time table rebuild: expand verifier_method CHECK to include 'claim_lattice' (quote-by-pointer answer mode added in G0). Same pattern as the paraphrase / ungrounded rebuilds: SQLite cannot modify a CHECK constraint in place, so we create a new table with the expanded CHECK, copy the data verbatim (claim_lattice is additive — no value translation needed), drop the old, rename the new. Caller probes existing CHECK from sqlite_master and only invokes this when 'claim_lattice' is missing. Unlike the older rebuilds this one preserves ``run_dag_root`` & ``run_dag_blob`` columns in the copy. By the time a shard reaches this migration those columns may already be populated; dropping them would erase per-run computation provenance. """ new_create = """ CREATE TABLE providence_cache_new ( cache_key TEXT PRIMARY KEY, source_root TEXT NOT NULL, document_uri TEXT NOT NULL, question_hash TEXT NOT NULL, question_text TEXT NOT NULL, answer_text TEXT NOT NULL, merkle_proof TEXT NOT NULL, model_profile_hash TEXT NOT NULL, conversation_hash TEXT NOT NULL, governance_policy_hash TEXT NOT NULL, schema_version TEXT NOT NULL, canonicalization_version TEXT NOT NULL, chunking_version TEXT NOT NULL, falsification_state TEXT NOT NULL DEFAULT 'live' CHECK (falsification_state IN ('live','failed','stale','quarantined')), chain TEXT NOT NULL DEFAULT 'private' CHECK (chain IN ('private','public')), audit_event_hash TEXT, created_at INTEGER NOT NULL, last_hit_at INTEGER, hit_count INTEGER NOT NULL DEFAULT 0, audit_mode TEXT NOT NULL DEFAULT 'UNGROUNDED' CHECK (audit_mode IN ('STRICT','HYBRID','UNGROUNDED')), n_quotes INTEGER NOT NULL DEFAULT 0, n_verified INTEGER NOT NULL DEFAULT 0, unverified_quotes TEXT, verifier_method TEXT NOT NULL DEFAULT 'none' CHECK (verifier_method IN ('quote','span','entity','paraphrase','claim_lattice','none')), run_dag_root TEXT, run_dag_blob TEXT ) """ conn.execute("BEGIN IMMEDIATE") try: conn.execute(new_create) conn.execute( "INSERT INTO providence_cache_new " "(cache_key, source_root, document_uri, question_hash, question_text, " " answer_text, merkle_proof, model_profile_hash, conversation_hash, " " governance_policy_hash, schema_version, canonicalization_version, " " chunking_version, falsification_state, chain, audit_event_hash, " " created_at, last_hit_at, hit_count, audit_mode, n_quotes, " " n_verified, unverified_quotes, verifier_method, " " run_dag_root, run_dag_blob) " "SELECT " " cache_key, source_root, document_uri, question_hash, question_text, " " answer_text, merkle_proof, model_profile_hash, conversation_hash, " " governance_policy_hash, schema_version, canonicalization_version, " " chunking_version, falsification_state, chain, audit_event_hash, " " created_at, last_hit_at, hit_count, audit_mode, n_quotes, " " n_verified, unverified_quotes, verifier_method, " " run_dag_root, run_dag_blob " "FROM providence_cache" ) conn.execute("DROP TABLE providence_cache") conn.execute("ALTER TABLE providence_cache_new RENAME TO providence_cache") conn.execute("COMMIT") except Exception: conn.execute("ROLLBACK") raise def _rebuild_providence_cache_canonical_projection(conn: sqlite3.Connection) -> None: """One-time table rebuild: extend audit_mode CHECK to admit 'CANONICAL_PROJECTION' AND verifier_method CHECK to admit 'canonical_projection' (#000027 — canonical projections persist). Same temp-table dance as the paraphrase / claim_lattice rebuilds. SQLite cannot modify a CHECK constraint in place; create new table with both extended CHECKs, copy data verbatim (the new tokens are additive — no value translation), drop old, rename new. Caller probes existing CHECK from sqlite_master and only invokes this helper when EITHER token is missing from the constraint string. Idempotent. """ new_create = """ CREATE TABLE providence_cache_new ( cache_key TEXT PRIMARY KEY, source_root TEXT NOT NULL, document_uri TEXT NOT NULL, question_hash TEXT NOT NULL, question_text TEXT NOT NULL, answer_text TEXT NOT NULL, merkle_proof TEXT NOT NULL, model_profile_hash TEXT NOT NULL, conversation_hash TEXT NOT NULL, governance_policy_hash TEXT NOT NULL, schema_version TEXT NOT NULL, canonicalization_version TEXT NOT NULL, chunking_version TEXT NOT NULL, falsification_state TEXT NOT NULL DEFAULT 'live' CHECK (falsification_state IN ('live','failed','stale','quarantined')), chain TEXT NOT NULL DEFAULT 'private' CHECK (chain IN ('private','public')), audit_event_hash TEXT, created_at INTEGER NOT NULL, last_hit_at INTEGER, hit_count INTEGER NOT NULL DEFAULT 0, audit_mode TEXT NOT NULL DEFAULT 'UNGROUNDED' CHECK (audit_mode IN ('STRICT','HYBRID','UNGROUNDED','CANONICAL_PROJECTION')), n_quotes INTEGER NOT NULL DEFAULT 0, n_verified INTEGER NOT NULL DEFAULT 0, unverified_quotes TEXT, verifier_method TEXT NOT NULL DEFAULT 'none' CHECK (verifier_method IN ('quote','span','entity','paraphrase','claim_lattice','canonical_projection','none')), run_dag_root TEXT, run_dag_blob TEXT ) """ conn.execute("BEGIN IMMEDIATE") try: conn.execute(new_create) conn.execute( "INSERT INTO providence_cache_new " "(cache_key, source_root, document_uri, question_hash, question_text, " " answer_text, merkle_proof, model_profile_hash, conversation_hash, " " governance_policy_hash, schema_version, canonicalization_version, " " chunking_version, falsification_state, chain, audit_event_hash, " " created_at, last_hit_at, hit_count, audit_mode, n_quotes, " " n_verified, unverified_quotes, verifier_method, " " run_dag_root, run_dag_blob) " "SELECT " " cache_key, source_root, document_uri, question_hash, question_text, " " answer_text, merkle_proof, model_profile_hash, conversation_hash, " " governance_policy_hash, schema_version, canonicalization_version, " " chunking_version, falsification_state, chain, audit_event_hash, " " created_at, last_hit_at, hit_count, audit_mode, n_quotes, " " n_verified, unverified_quotes, verifier_method, " " run_dag_root, run_dag_blob " "FROM providence_cache" ) conn.execute("DROP TABLE providence_cache") conn.execute("ALTER TABLE providence_cache_new RENAME TO providence_cache") conn.execute("COMMIT") except Exception: conn.execute("ROLLBACK") raise def _rebuild_providence_cache_ungrounded(conn: sqlite3.Connection) -> None: """One-time table rebuild: rename audit_mode value VISUAL → UNGROUNDED. SQLite cannot modify a column's CHECK constraint in place. Standard pattern: create new table with new CHECK, copy data while translating values, drop old, rename new. Wrapped in IMMEDIATE transaction so a failure rolls back cleanly without leaving the DB half-migrated. """ new_create = """ CREATE TABLE providence_cache_new ( cache_key TEXT PRIMARY KEY, source_root TEXT NOT NULL, document_uri TEXT NOT NULL, question_hash TEXT NOT NULL, question_text TEXT NOT NULL, answer_text TEXT NOT NULL, merkle_proof TEXT NOT NULL, model_profile_hash TEXT NOT NULL, conversation_hash TEXT NOT NULL, governance_policy_hash TEXT NOT NULL, schema_version TEXT NOT NULL, canonicalization_version TEXT NOT NULL, chunking_version TEXT NOT NULL, falsification_state TEXT NOT NULL DEFAULT 'live' CHECK (falsification_state IN ('live','failed','stale','quarantined')), chain TEXT NOT NULL DEFAULT 'private' CHECK (chain IN ('private','public')), audit_event_hash TEXT, created_at INTEGER NOT NULL, last_hit_at INTEGER, hit_count INTEGER NOT NULL DEFAULT 0, audit_mode TEXT NOT NULL DEFAULT 'UNGROUNDED' CHECK (audit_mode IN ('STRICT','HYBRID','UNGROUNDED')), n_quotes INTEGER NOT NULL DEFAULT 0, n_verified INTEGER NOT NULL DEFAULT 0, unverified_quotes TEXT, verifier_method TEXT NOT NULL DEFAULT 'none' CHECK (verifier_method IN ('quote','span','entity','paraphrase','none')), run_dag_root TEXT, run_dag_blob TEXT ) """ conn.execute("BEGIN IMMEDIATE") try: conn.execute(new_create) conn.execute( "INSERT INTO providence_cache_new " "(cache_key, source_root, document_uri, question_hash, question_text, " " answer_text, merkle_proof, model_profile_hash, conversation_hash, " " governance_policy_hash, schema_version, canonicalization_version, " " chunking_version, falsification_state, chain, audit_event_hash, " " created_at, last_hit_at, hit_count, audit_mode, n_quotes, " " n_verified, unverified_quotes, verifier_method) " "SELECT " " cache_key, source_root, document_uri, question_hash, question_text, " " answer_text, merkle_proof, model_profile_hash, conversation_hash, " " governance_policy_hash, schema_version, canonicalization_version, " " chunking_version, falsification_state, chain, audit_event_hash, " " created_at, last_hit_at, hit_count, " " CASE WHEN audit_mode = 'VISUAL' THEN 'UNGROUNDED' ELSE audit_mode END, " " n_quotes, n_verified, unverified_quotes, verifier_method " "FROM providence_cache" ) conn.execute("DROP TABLE providence_cache") conn.execute("ALTER TABLE providence_cache_new RENAME TO providence_cache") conn.execute("COMMIT") except Exception: conn.execute("ROLLBACK") raise # Tables that exist in every shard with the same schema. Used to build # cross-shard UNION views in connect_query(). _SHARDABLE_TABLES = ( "documents", "chunks", "merkle_nodes", "edges", "derivations", "providence_cache", "audit_events", "falsifications", "concept_relations", "concept_token_idf", "adapter_loss_reports", ) # Per-table column lists for cross-shard UNION views. The `chunks` table # is pinned explicitly because the column order matters for cross-shard # search: chunks_fts is contentless and joins back to `chunks.chunk_id`. # Mixing legacy (composite-PK, no chunk_id column) shards with current # (chunk_id-keyed) shards in the same --shards-dir is unsupported — run # the migration on legacy shards first or keep them in a separate dir. _SHARED_COLUMNS = { "chunks": "chunk_id, document_root, idx, leaf_hash, content, tier", }
[docs] def discover_shards(shards_dir: Path | str) -> list[Path]: """Enumerate shard DB files in `shards_dir`. Returns sorted list of paths.""" p = Path(shards_dir) if not p.is_dir(): return [] return sorted(p.glob("*.db"))
[docs] def connect_query( db_path: Path | str | None = None, shards_dir: Path | str | None = None, ) -> sqlite3.Connection: """Open a read-only-style connection that surfaces ALL shards as one DB. If `shards_dir` is set, every `*.db` in it is ATTACHed and UNION ALL views are created over the standard tables so existing queries (`SELECT * FROM documents`) work unchanged across shards. Reads only — writes still go through `connect()` against a specific shard. If `shards_dir` is None, returns a normal `connect(db_path)` for back-compat. """ if shards_dir is None: return connect(db_path or DEFAULT_DB_PATH) shard_paths = discover_shards(shards_dir) conn = sqlite3.connect(":memory:", isolation_level=None) conn.row_factory = sqlite3.Row conn.execute("PRAGMA temp_store = MEMORY") if not shard_paths: # Nothing attached; create empty placeholder tables so callers don't crash. conn.executescript(SCHEMA_SQL) return conn aliases: list[str] = [] for i, sp in enumerate(shard_paths): alias = f"sh{i:03d}" conn.execute(f"ATTACH DATABASE ? AS {alias}", (str(sp.resolve()),)) aliases.append(alias) # UNION ALL views over the shardable tables. Columns are listed # explicitly (not `SELECT *`) so a shard cluster that mixes the prior # composite-PK chunks layout with the newer chunk_id-keyed layout still # unions cleanly — the explicit list is the intersection of columns # present in both schema generations. for table in _SHARDABLE_TABLES: cols = _SHARED_COLUMNS.get(table, "*") unions = " UNION ALL ".join( f"SELECT {cols} FROM {a}.{table}" for a in aliases ) conn.execute(f"CREATE TEMP VIEW {table} AS {unions}") # Stash the shard list for tools that want it. conn.execute( "CREATE TEMP TABLE _shards (shard_id TEXT, path TEXT, alias TEXT)" ) conn.executemany( "INSERT INTO _shards (shard_id, path, alias) VALUES (?, ?, ?)", [(p.stem, str(p.resolve()), a) for p, a in zip(shard_paths, aliases)], ) return conn
[docs] def connect_readonly(db_path: Path | str) -> sqlite3.Connection: """Open an existing DB read-only — ``mode=ro`` URI, no schema bootstrap, no migration probes. For CLI paths that only walk data (audit-chain checks, per-shard counts): a read op must neither run DDL nor take a write lock, but ``connect()`` does both — it runs ``executescript(SCHEMA_SQL)`` plus the forward migrations on the first open of each file in a process. (That migration pass on a read-only walk is exactly what surfaced the ``fork_score_branches already exists`` crash.) Raises ``sqlite3.OperationalError`` if the file is missing or unreadable; callers iterating a shard set should catch and skip if they expect stragglers. """ conn = sqlite3.connect( f"file:{Path(db_path)}?mode=ro", uri=True, isolation_level=None ) conn.row_factory = sqlite3.Row return conn
[docs] @contextmanager def transaction(conn: sqlite3.Connection) -> Iterator[sqlite3.Connection]: """BEGIN IMMEDIATE / COMMIT / ROLLBACK around a block.""" conn.execute("BEGIN IMMEDIATE") try: yield conn except Exception: conn.execute("ROLLBACK") raise else: conn.execute("COMMIT")
def _canonical_json(obj) -> str: """Stable JSON for audit hashing: sorted keys, no whitespace.""" return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False)
[docs] def get_meta(conn: sqlite3.Connection, key: str) -> str | None: """Read a value from the per-DB meta table; None if missing.""" row = conn.execute("SELECT value FROM meta WHERE key = ?", (key,)).fetchone() return row["value"] if row else None
[docs] def set_meta(conn: sqlite3.Connection, key: str, value: str) -> None: """Upsert a (key, value) into meta. Caller wraps in a transaction.""" conn.execute( "INSERT INTO meta (key, value, updated_at) VALUES (?, ?, ?) " "ON CONFLICT(key) DO UPDATE SET value = excluded.value, " "updated_at = excluded.updated_at", (key, value, int(time.time())), )
[docs] def latest_event_hash(conn: sqlite3.Connection) -> str | None: """Return the last event_hash in the audit chain, or None for genesis.""" row = conn.execute( "SELECT event_hash FROM audit_events ORDER BY seq DESC LIMIT 1" ).fetchone() return row["event_hash"] if row else None
[docs] def chain_audit_events( prev_event_hash: str | None, events: list[dict], ) -> tuple[list[tuple], str | None]: """Compute the event_hash chain for a batch in pure Python. Each event dict needs: `event_type`, `body` (dict), `subject_root` (str|None), `ts` (int). Returns (rows_for_executemany, last_event_hash). Insert with: executemany("INSERT INTO audit_events (event_hash, prev_event_hash, event_type, subject_root, body, ts) VALUES (?, ?, ?, ?, ?, ?)", rows) All chain SHA-256s are computed locally — zero DB round-trips per event. """ import hashlib rows: list[tuple] = [] prev = prev_event_hash for ev in events: body_json = _canonical_json(ev["body"]) h = hashlib.sha256() if prev is not None: h.update(bytes.fromhex(prev)) h.update(body_json.encode("utf-8", errors="surrogatepass")) event_hash = h.hexdigest() rows.append( ( event_hash, prev, ev["event_type"], ev.get("subject_root"), body_json, ev["ts"], ) ) prev = event_hash return rows, prev
[docs] def append_audit( conn: sqlite3.Connection, event_type: str, body: dict, subject_root: str | None = None, ts: int | None = None, ) -> str: """Append one event to the audit chain. Returns the new event_hash (hex). Atomic head-read + insert. If the connection is not already inside a transaction, the read of the current chain head and the INSERT run inside this call's own ``BEGIN IMMEDIATE`` / ``COMMIT`` — so two concurrent appenders serialize on the write lock instead of both reading the same head and chaining off it (which forks the chain; qa.db seq 7724/7725 was exactly that, from two concurrent ``providence_burn`` writes). A caller already inside a ``transaction()`` gets the append folded into that unit. With ``connect()``'s ``busy_timeout`` the loser waits rather than failing ``database is locked``. Convenience wrapper for one-off events. Bulk inserts should use chain_audit_events() + executemany() inside a ``transaction()`` for ~10x throughput on large batches (same serialization guarantee). """ import hashlib if ts is None: ts = int(time.time()) body_json = _canonical_json(body) own_txn = not conn.in_transaction if own_txn: conn.execute("BEGIN IMMEDIATE") try: prev = latest_event_hash(conn) h = hashlib.sha256() if prev is not None: h.update(bytes.fromhex(prev)) h.update(body_json.encode("utf-8", errors="surrogatepass")) event_hash = h.hexdigest() conn.execute( "INSERT INTO audit_events (event_hash, prev_event_hash, event_type, subject_root, body, ts) " "VALUES (?, ?, ?, ?, ?, ?)", (event_hash, prev, event_type, subject_root, body_json, ts), ) except BaseException: if own_txn: conn.execute("ROLLBACK") raise if own_txn: conn.execute("COMMIT") return event_hash
[docs] def stats(conn: sqlite3.Connection) -> dict: """Quick landscape report.""" def one(sql: str, *args) -> int: return conn.execute(sql, args).fetchone()[0] return { "documents_total": one("SELECT COUNT(*) FROM documents"), "documents_surface": one("SELECT COUNT(*) FROM documents WHERE kind='surface'"), "documents_core": one("SELECT COUNT(*) FROM documents WHERE kind='core'"), "chunks_total": one("SELECT COUNT(*) FROM chunks"), "chunks_hot": one("SELECT COUNT(*) FROM chunks WHERE tier='hot'"), "chunks_warm": one("SELECT COUNT(*) FROM chunks WHERE tier='warm'"), "chunks_cold": one("SELECT COUNT(*) FROM chunks WHERE tier='cold'"), "edges_total": one("SELECT COUNT(*) FROM edges"), "providence_total": one("SELECT COUNT(*) FROM providence_cache"), "audit_events_total": one("SELECT COUNT(*) FROM audit_events"), }