Source code for arborist.ingest

"""Ingest pipeline: Source -> normalize -> chunk -> merkle -> upsert.

Idempotent: re-ingesting the same Document is a no-op (document_root collision
is the upsert key).

Performance shape — bulk-batched writer:
  Each batch collapses ALL inserts across N docs into a small set of
  executemany() calls (one per table) instead of per-doc calls. Audit chain
  hashes computed in pure Python via store.chain_audit_events, then inserted
  in one shot. With WAL+synchronous=NORMAL, the dominant cost shifts from
  Python<->C boundary crossings to actual SQLite work.
"""

from __future__ import annotations

import sqlite3
import time
from dataclasses import dataclass

from arborist import (
    CANONICALIZATION_VERSION,
    SCHEMA_VERSION,
)
from arborist.compress import pack_chunk, unpack_chunk
from arborist.document import Document, canonicalize, get_chunker
from arborist.merkle import MerkleTree, hash_leaf
from arborist.progress import Progress
from arborist.source import Source
from arborist.store import (
    chain_audit_events,
    get_meta,
    latest_event_hash,
    set_meta,
    transaction,
)


DEFAULT_BATCH_SIZE = 200


@dataclass
class _DocArtifacts:
    document_root: str
    leaves: list[bytes]
    chunk_strs: list[str]
    tree: MerkleTree


[docs] @dataclass class IngestStats: seen: int = 0 inserted: int = 0 skipped_duplicate: int = 0 chunks_total: int = 0 edges_total: int = 0
[docs] def ingest_source( conn: sqlite3.Connection, source: Source, chunker_name: str | None = None, limit: int | None = None, batch_size: int = DEFAULT_BATCH_SIZE, resume: bool = False, progress: Progress | None = None, loss_report_enabled: bool = True, loss_report_excerpts: bool = True, loss_report_max_excerpt_bytes: int = 200, ) -> IngestStats: """Ingest every document the source yields. Returns counts. `resume=True` reads the per-source high-water mark from this DB's meta table and asks the source to fast-forward past it. After each successful batch flush, the high-water mark is updated in meta. A killed process can rsync forward by re-running with --resume. `progress` (optional) gets a `tick(seen, inserted=...)` call after each batch flush. Pass an `arborist.progress.Progress` for live stderr output. """ chunker = get_chunker(chunker_name) stats = IngestStats() batch: list[tuple[Document, _DocArtifacts]] = [] meta_key = f"source_high_water:{source.source_type}" if resume: prior = get_meta(conn, meta_key) if prior is not None and hasattr(source, "start_id"): try: source.start_id = int(prior) source.last_id = int(prior) except (TypeError, ValueError): pass def flush() -> None: if not batch: return inserted, skipped = _flush_batch( conn, batch, chunker.name, loss_report_enabled=loss_report_enabled, loss_report_excerpts=loss_report_excerpts, loss_report_max_excerpt_bytes=loss_report_max_excerpt_bytes, ) stats.inserted += inserted stats.skipped_duplicate += skipped batch.clear() if hasattr(source, "last_id") and source.last_id: with transaction(conn): set_meta(conn, meta_key, str(source.last_id)) if progress is not None: progress.tick(stats.seen, inserted=stats.inserted) for doc in source.iter_documents(): stats.seen += 1 if limit is not None and stats.seen > limit: break art = _compute_artifacts(doc, chunker) if art is None: stats.skipped_duplicate += 1 continue batch.append((doc, art)) if len(batch) >= batch_size: flush() flush() if progress is not None: progress.done(stats.seen, inserted=stats.inserted) stats.chunks_total = conn.execute("SELECT COUNT(*) FROM chunks").fetchone()[0] stats.edges_total = conn.execute("SELECT COUNT(*) FROM edges").fetchone()[0] return stats
def _compute_artifacts(doc: Document, chunker) -> _DocArtifacts | None: """Pure: canonicalize, chunk, hash leaves, build the tree. No DB access.""" text = canonicalize(doc.content) chunk_strs = chunker.split(text) if not chunk_strs: return None leaves = [hash_leaf(c.encode("utf-8")) for c in chunk_strs] tree = MerkleTree.build(leaves) return _DocArtifacts( document_root=tree.root.hex(), leaves=leaves, chunk_strs=chunk_strs, tree=tree, ) def _flush_batch( conn: sqlite3.Connection, batch: list[tuple[Document, _DocArtifacts]], chunker_name: str, *, loss_report_enabled: bool = True, loss_report_excerpts: bool = True, loss_report_max_excerpt_bytes: int = 200, ) -> tuple[int, int]: """Bulk-insert the whole batch. Returns (inserted, skipped_duplicate). All collisions and prior-version lookups happen up front via batched SELECTs. New rows accumulate into per-table mega-lists and flush via one executemany per table. Audit-chain hashes are computed in pure Python and inserted in one shot. """ inserted = 0 skipped = 0 ingest_ts = int(time.time()) with transaction(conn): # 1) Collision check: which document_roots already exist? roots = [art.document_root for _, art in batch] existing: set[str] = _select_existing_roots(conn, roots) inserted_this_batch: set[str] = set() # Pre-compute the chunk_id range we'll use this batch. Reading MAX # under BEGIN IMMEDIATE is safe — concurrent writers serialize on # the WAL writer lock, so this snapshot won't race. next_chunk_id = ( conn.execute("SELECT COALESCE(MAX(chunk_id), 0) FROM chunks") .fetchone()[0] + 1 ) # 2) Batched prior-URI resolution: one IN-clause SELECT instead of # one per-doc SELECT. Maps each URI to the most-recent existing # document_root in the DB (pre-batch state). new_uris = list({doc.uri for doc, art in batch if art.document_root not in existing}) prior_db: dict[str, str] = {} for slab_start in range(0, len(new_uris), 500): slab = new_uris[slab_start : slab_start + 500] placeholders = ",".join("?" * len(slab)) for row in conn.execute( "SELECT document_uri, document_root FROM documents " f"WHERE document_uri IN ({placeholders}) " "ORDER BY ingest_ts DESC", slab, ): # First wins (most recent by ORDER BY DESC). prior_db.setdefault(row["document_uri"], row["document_root"]) # Within-batch chain state: as we process, the "prior" for the next # doc with the same URI becomes the doc we just inserted. prior_for_doc: dict[str, str] = {} documents_rows: list[tuple] = [] chunk_rows: list[tuple] = [] fts_rows: list[tuple] = [] merkle_rows: list[tuple] = [] edge_rows: list[tuple] = [] audit_events: list[dict] = [] edges_to_upsert: list[tuple[str, Document]] = [] # (chunk_id, document_root, [LossEvent...]) collected during the # main loop, persisted at end-of-batch in one executemany. See # ticket #000022 §3.5. Sidecar — does NOT enter audit_events. loss_persist: list[tuple[int, str, list]] = [] loss_policy_hash = "" if loss_report_enabled: from arborist.sources.loss_report import compute_loss_report_policy_hash loss_policy_hash = compute_loss_report_policy_hash( enabled=loss_report_enabled, excerpts=loss_report_excerpts, max_excerpt_bytes=loss_report_max_excerpt_bytes, ) for doc, art in batch: if art.document_root in existing or art.document_root in inserted_this_batch: edges_to_upsert.append((art.document_root, doc)) skipped += 1 continue prior_root = prior_for_doc.get(doc.uri) or prior_db.get(doc.uri) if prior_root == art.document_root: prior_root = None # same content, not a real prior documents_rows.append( ( art.document_root, doc.uri, doc.source_type, doc.title, chunker_name, CANONICALIZATION_VERSION, SCHEMA_VERSION, ingest_ts, ) ) is_wikitext_source = doc.source_type.startswith("wikipedia_") first_chunk_id_for_doc: int | None = None for i, c in enumerate(art.chunk_strs): chunk_id = next_chunk_id next_chunk_id += 1 if first_chunk_id_for_doc is None: first_chunk_id_for_doc = chunk_id chunk_rows.append( (chunk_id, art.document_root, i, art.leaves[i].hex(), pack_chunk(c)) ) # Contentless FTS5 indexes the plaintext but stores no copy; # rowid must equal chunks.chunk_id so search-time JOINs line up. fts_rows.append((chunk_id, c)) # Per-chunk wikitext_base loss reporting. Run to_base() # eagerly with a collector to capture what the LLM-side # path would later drop. Output is discarded — # chunks.content stays raw wikitext per the # document_root invariant. if loss_report_enabled and is_wikitext_source: try: from arborist.sources.loss_report import LossCollector from arborist.wikitext import ( ADAPTER_NAME as _WT_ADAPTER, BASE_VERSION as _WT_VERSION, to_base as _wt_to_base, ) collector = LossCollector( excerpts_enabled=loss_report_excerpts, max_excerpt_bytes=loss_report_max_excerpt_bytes, adapter_name=_WT_ADAPTER, adapter_version=_WT_VERSION, ) _wt_to_base(c, loss_collector=collector) events = collector.events() if events: loss_persist.append( (chunk_id, art.document_root, events) ) except ImportError: # mwparserfromhell not installed — skip silently. # LossReport is additive metadata, never a gate. pass # Document-scope losses (e.g. HTML normalize) anchor to the # first chunk_id by convention. See ticket #000022 §3.4. if ( loss_report_enabled and first_chunk_id_for_doc is not None and isinstance(doc.extra, dict) ): doc_events = doc.extra.get("loss_events") if doc_events: loss_persist.append( (first_chunk_id_for_doc, art.document_root, list(doc_events)) ) for layer_idx in range(1, len(art.tree.layers)): for node_idx, h in enumerate(art.tree.layers[layer_idx]): merkle_rows.append( (art.document_root, layer_idx, node_idx, h.hex()) ) if prior_root is not None: edge_rows.append( (art.document_root, prior_root, doc.uri, "supersedes", "") ) edges_to_upsert.append((art.document_root, doc)) inserted_this_batch.add(art.document_root) prior_for_doc[doc.uri] = art.document_root audit_events.append( { "event_type": "ingest", "subject_root": art.document_root, "ts": ingest_ts, "body": { "document_uri": doc.uri, "source_type": doc.source_type, "chunks": len(art.chunk_strs), "chunking_version": chunker_name, "canonicalization_version": CANONICALIZATION_VERSION, "schema_version": SCHEMA_VERSION, "supersedes": prior_root, }, } ) inserted += 1 # 3) One executemany per table — minimal Python<->C boundary crossings. if documents_rows: conn.executemany( "INSERT INTO documents " "(document_root, document_uri, source_type, kind, compression_depth, " " title, chunking_version, canonicalization_version, schema_version, " " ingest_ts) " "VALUES (?, ?, ?, 'surface', 0, ?, ?, ?, ?, ?)", documents_rows, ) if chunk_rows: conn.executemany( "INSERT INTO chunks (chunk_id, document_root, idx, leaf_hash, content) " "VALUES (?, ?, ?, ?, ?)", chunk_rows, ) conn.executemany( "INSERT INTO chunks_fts (rowid, content) VALUES (?, ?)", fts_rows, ) if merkle_rows: conn.executemany( "INSERT INTO merkle_nodes (document_root, layer, idx, hash) " "VALUES (?, ?, ?, ?)", merkle_rows, ) # 3.5) Adapter LossReport sidecar (ticket #000022). Persisted # under the same transaction as chunks so an auditor never sees # a chunk_id without its loss rows. Sidecar — does NOT enter # audit_events / cache_key / document_root. if loss_persist: from arborist.sources.loss_report import record_losses for ck, drt, evs in loss_persist: record_losses( conn, chunk_id=ck, document_root=drt, events=evs, loss_report_policy_hash=loss_policy_hash, ts=ingest_ts, ) # 4) Edges: collect all wikilinks across the batch and resolve in # one SELECT. Then one executemany. _flush_edges(conn, edges_to_upsert) if edge_rows: conn.executemany( "INSERT OR IGNORE INTO edges " "(src_root, dst_root, dst_uri, edge_type, anchor) " "VALUES (?, ?, ?, ?, ?)", edge_rows, ) # 5) Audit chain — hashes computed in Python, inserted in one call. if audit_events: prev = latest_event_hash(conn) audit_rows, _ = chain_audit_events(prev, audit_events) conn.executemany( "INSERT INTO audit_events " "(event_hash, prev_event_hash, event_type, subject_root, body, ts) " "VALUES (?, ?, ?, ?, ?, ?)", audit_rows, ) # Capital ledger (ticket #000020). One row per batch attached # to the last event_hash. Sibling table — does NOT enter # audit_events.event_hash preimage. from arborist.capital import profile_for_op, record as capital_record total_chunks = sum( ev["body"].get("chunks", 0) for ev in audit_events ) # Heuristic: ~512 tokens × ~4 chars/token per chunk. approx_bytes = total_chunks * 512 * 4 profile, inputs = profile_for_op( "ingest", { "doc_count": len(audit_events), "total_bytes": approx_bytes, }, ) last_event_hash = audit_rows[-1][0] capital_record( conn, audit_event_hash=last_event_hash, op_type="ingest", profile=profile, estimator_inputs=inputs, ts=ingest_ts, ) return inserted, skipped def _select_existing_roots( conn: sqlite3.Connection, roots: list[str] ) -> set[str]: """Single SELECT to find which document_roots already exist.""" if not roots: return set() # SQLite has a default 999-param limit; chunk just in case. found: set[str] = set() for i in range(0, len(roots), 500): slab = roots[i : i + 500] placeholders = ",".join("?" * len(slab)) for row in conn.execute( f"SELECT document_root FROM documents WHERE document_root IN ({placeholders})", slab, ): found.add(row["document_root"]) return found def _flush_edges( conn: sqlite3.Connection, edges_to_upsert: list[tuple[str, Document]], ) -> None: """Upsert all wikilink/hyperlink edges across the batch. Resolves dst_root for previously-unresolved URIs using a single batched SELECT (URI -> document_root) instead of N per-edge SELECTs. """ if not edges_to_upsert: return # Collect distinct dst_uris across the batch for one resolution lookup. distinct_uris: set[str] = set() for _, doc in edges_to_upsert: for e in doc.edges: if e.dst_uri: distinct_uris.add(e.dst_uri) uri_to_root: dict[str, str] = {} if distinct_uris: uris_list = list(distinct_uris) for i in range(0, len(uris_list), 500): slab = uris_list[i : i + 500] placeholders = ",".join("?" * len(slab)) for row in conn.execute( "SELECT document_uri, document_root FROM documents " f"WHERE document_uri IN ({placeholders})", slab, ): # Earliest ingest wins (matches prior LIMIT 1 ASC behavior). uri_to_root.setdefault(row["document_uri"], row["document_root"]) rows: list[tuple] = [] for src_root, doc in edges_to_upsert: for e in doc.edges: dst_root = e.dst_root or uri_to_root.get(e.dst_uri or "", "") rows.append( ( src_root, dst_root, e.dst_uri or "", e.edge_type, e.anchor or "", ) ) if rows: conn.executemany( "INSERT OR IGNORE INTO edges " "(src_root, dst_root, dst_uri, edge_type, anchor) VALUES (?, ?, ?, ?, ?)", rows, )
[docs] def verify_random_sample(conn: sqlite3.Connection, n: int = 10) -> dict: """Sample N documents, regenerate Merkle proof for chunk 0, verify.""" from arborist.merkle import hash_leaf, verify_proof rows = conn.execute( "SELECT document_root FROM documents ORDER BY RANDOM() LIMIT ?", (n,) ).fetchall() if not rows: return {"sampled": 0, "passed": 0, "failed": 0} passed = 0 failed = 0 for row in rows: document_root = row["document_root"] chunk_rows = conn.execute( "SELECT idx, leaf_hash, content FROM chunks " "WHERE document_root = ? ORDER BY idx ASC", (document_root,), ).fetchall() if not chunk_rows: failed += 1 continue leaves = [bytes.fromhex(r["leaf_hash"]) for r in chunk_rows] c0 = chunk_rows[0] c0_content = unpack_chunk(c0["content"]) if c0_content is not None: recomputed_leaf = hash_leaf(c0_content.encode("utf-8")) if recomputed_leaf != leaves[0]: failed += 1 continue tree = MerkleTree.build(leaves) if tree.root.hex() != document_root: failed += 1 continue proof = tree.proof(0) if verify_proof(proof) and proof.root.hex() == document_root: passed += 1 else: failed += 1 return {"sampled": len(rows), "passed": passed, "failed": failed}