"""Ingest pipeline: Source -> normalize -> chunk -> merkle -> upsert.
Idempotent: re-ingesting the same Document is a no-op (document_root collision
is the upsert key).
Performance shape — bulk-batched writer:
Each batch collapses ALL inserts across N docs into a small set of
executemany() calls (one per table) instead of per-doc calls. Audit chain
hashes computed in pure Python via store.chain_audit_events, then inserted
in one shot. With WAL+synchronous=NORMAL, the dominant cost shifts from
Python<->C boundary crossings to actual SQLite work.
"""
from __future__ import annotations
import sqlite3
import time
from dataclasses import dataclass
from arborist import (
CANONICALIZATION_VERSION,
SCHEMA_VERSION,
)
from arborist.compress import pack_chunk, unpack_chunk
from arborist.document import Document, canonicalize, get_chunker
from arborist.merkle import MerkleTree, hash_leaf
from arborist.progress import Progress
from arborist.source import Source
from arborist.store import (
chain_audit_events,
get_meta,
latest_event_hash,
set_meta,
transaction,
)
DEFAULT_BATCH_SIZE = 200
@dataclass
class _DocArtifacts:
document_root: str
leaves: list[bytes]
chunk_strs: list[str]
tree: MerkleTree
[docs]
@dataclass
class IngestStats:
seen: int = 0
inserted: int = 0
skipped_duplicate: int = 0
chunks_total: int = 0
edges_total: int = 0
[docs]
def ingest_source(
conn: sqlite3.Connection,
source: Source,
chunker_name: str | None = None,
limit: int | None = None,
batch_size: int = DEFAULT_BATCH_SIZE,
resume: bool = False,
progress: Progress | None = None,
loss_report_enabled: bool = True,
loss_report_excerpts: bool = True,
loss_report_max_excerpt_bytes: int = 200,
) -> IngestStats:
"""Ingest every document the source yields. Returns counts.
`resume=True` reads the per-source high-water mark from this DB's meta
table and asks the source to fast-forward past it. After each successful
batch flush, the high-water mark is updated in meta. A killed process
can rsync forward by re-running with --resume.
`progress` (optional) gets a `tick(seen, inserted=...)` call after each
batch flush. Pass an `arborist.progress.Progress` for live stderr output.
"""
chunker = get_chunker(chunker_name)
stats = IngestStats()
batch: list[tuple[Document, _DocArtifacts]] = []
meta_key = f"source_high_water:{source.source_type}"
if resume:
prior = get_meta(conn, meta_key)
if prior is not None and hasattr(source, "start_id"):
try:
source.start_id = int(prior)
source.last_id = int(prior)
except (TypeError, ValueError):
pass
def flush() -> None:
if not batch:
return
inserted, skipped = _flush_batch(
conn,
batch,
chunker.name,
loss_report_enabled=loss_report_enabled,
loss_report_excerpts=loss_report_excerpts,
loss_report_max_excerpt_bytes=loss_report_max_excerpt_bytes,
)
stats.inserted += inserted
stats.skipped_duplicate += skipped
batch.clear()
if hasattr(source, "last_id") and source.last_id:
with transaction(conn):
set_meta(conn, meta_key, str(source.last_id))
if progress is not None:
progress.tick(stats.seen, inserted=stats.inserted)
for doc in source.iter_documents():
stats.seen += 1
if limit is not None and stats.seen > limit:
break
art = _compute_artifacts(doc, chunker)
if art is None:
stats.skipped_duplicate += 1
continue
batch.append((doc, art))
if len(batch) >= batch_size:
flush()
flush()
if progress is not None:
progress.done(stats.seen, inserted=stats.inserted)
stats.chunks_total = conn.execute("SELECT COUNT(*) FROM chunks").fetchone()[0]
stats.edges_total = conn.execute("SELECT COUNT(*) FROM edges").fetchone()[0]
return stats
def _compute_artifacts(doc: Document, chunker) -> _DocArtifacts | None:
"""Pure: canonicalize, chunk, hash leaves, build the tree. No DB access."""
text = canonicalize(doc.content)
chunk_strs = chunker.split(text)
if not chunk_strs:
return None
leaves = [hash_leaf(c.encode("utf-8")) for c in chunk_strs]
tree = MerkleTree.build(leaves)
return _DocArtifacts(
document_root=tree.root.hex(),
leaves=leaves,
chunk_strs=chunk_strs,
tree=tree,
)
def _flush_batch(
conn: sqlite3.Connection,
batch: list[tuple[Document, _DocArtifacts]],
chunker_name: str,
*,
loss_report_enabled: bool = True,
loss_report_excerpts: bool = True,
loss_report_max_excerpt_bytes: int = 200,
) -> tuple[int, int]:
"""Bulk-insert the whole batch. Returns (inserted, skipped_duplicate).
All collisions and prior-version lookups happen up front via batched
SELECTs. New rows accumulate into per-table mega-lists and flush via
one executemany per table. Audit-chain hashes are computed in pure
Python and inserted in one shot.
"""
inserted = 0
skipped = 0
ingest_ts = int(time.time())
with transaction(conn):
# 1) Collision check: which document_roots already exist?
roots = [art.document_root for _, art in batch]
existing: set[str] = _select_existing_roots(conn, roots)
inserted_this_batch: set[str] = set()
# Pre-compute the chunk_id range we'll use this batch. Reading MAX
# under BEGIN IMMEDIATE is safe — concurrent writers serialize on
# the WAL writer lock, so this snapshot won't race.
next_chunk_id = (
conn.execute("SELECT COALESCE(MAX(chunk_id), 0) FROM chunks")
.fetchone()[0]
+ 1
)
# 2) Batched prior-URI resolution: one IN-clause SELECT instead of
# one per-doc SELECT. Maps each URI to the most-recent existing
# document_root in the DB (pre-batch state).
new_uris = list({doc.uri for doc, art in batch
if art.document_root not in existing})
prior_db: dict[str, str] = {}
for slab_start in range(0, len(new_uris), 500):
slab = new_uris[slab_start : slab_start + 500]
placeholders = ",".join("?" * len(slab))
for row in conn.execute(
"SELECT document_uri, document_root FROM documents "
f"WHERE document_uri IN ({placeholders}) "
"ORDER BY ingest_ts DESC",
slab,
):
# First wins (most recent by ORDER BY DESC).
prior_db.setdefault(row["document_uri"], row["document_root"])
# Within-batch chain state: as we process, the "prior" for the next
# doc with the same URI becomes the doc we just inserted.
prior_for_doc: dict[str, str] = {}
documents_rows: list[tuple] = []
chunk_rows: list[tuple] = []
fts_rows: list[tuple] = []
merkle_rows: list[tuple] = []
edge_rows: list[tuple] = []
audit_events: list[dict] = []
edges_to_upsert: list[tuple[str, Document]] = []
# (chunk_id, document_root, [LossEvent...]) collected during the
# main loop, persisted at end-of-batch in one executemany. See
# ticket #000022 §3.5. Sidecar — does NOT enter audit_events.
loss_persist: list[tuple[int, str, list]] = []
loss_policy_hash = ""
if loss_report_enabled:
from arborist.sources.loss_report import compute_loss_report_policy_hash
loss_policy_hash = compute_loss_report_policy_hash(
enabled=loss_report_enabled,
excerpts=loss_report_excerpts,
max_excerpt_bytes=loss_report_max_excerpt_bytes,
)
for doc, art in batch:
if art.document_root in existing or art.document_root in inserted_this_batch:
edges_to_upsert.append((art.document_root, doc))
skipped += 1
continue
prior_root = prior_for_doc.get(doc.uri) or prior_db.get(doc.uri)
if prior_root == art.document_root:
prior_root = None # same content, not a real prior
documents_rows.append(
(
art.document_root,
doc.uri,
doc.source_type,
doc.title,
chunker_name,
CANONICALIZATION_VERSION,
SCHEMA_VERSION,
ingest_ts,
)
)
is_wikitext_source = doc.source_type.startswith("wikipedia_")
first_chunk_id_for_doc: int | None = None
for i, c in enumerate(art.chunk_strs):
chunk_id = next_chunk_id
next_chunk_id += 1
if first_chunk_id_for_doc is None:
first_chunk_id_for_doc = chunk_id
chunk_rows.append(
(chunk_id, art.document_root, i, art.leaves[i].hex(), pack_chunk(c))
)
# Contentless FTS5 indexes the plaintext but stores no copy;
# rowid must equal chunks.chunk_id so search-time JOINs line up.
fts_rows.append((chunk_id, c))
# Per-chunk wikitext_base loss reporting. Run to_base()
# eagerly with a collector to capture what the LLM-side
# path would later drop. Output is discarded —
# chunks.content stays raw wikitext per the
# document_root invariant.
if loss_report_enabled and is_wikitext_source:
try:
from arborist.sources.loss_report import LossCollector
from arborist.wikitext import (
ADAPTER_NAME as _WT_ADAPTER,
BASE_VERSION as _WT_VERSION,
to_base as _wt_to_base,
)
collector = LossCollector(
excerpts_enabled=loss_report_excerpts,
max_excerpt_bytes=loss_report_max_excerpt_bytes,
adapter_name=_WT_ADAPTER,
adapter_version=_WT_VERSION,
)
_wt_to_base(c, loss_collector=collector)
events = collector.events()
if events:
loss_persist.append(
(chunk_id, art.document_root, events)
)
except ImportError:
# mwparserfromhell not installed — skip silently.
# LossReport is additive metadata, never a gate.
pass
# Document-scope losses (e.g. HTML normalize) anchor to the
# first chunk_id by convention. See ticket #000022 §3.4.
if (
loss_report_enabled
and first_chunk_id_for_doc is not None
and isinstance(doc.extra, dict)
):
doc_events = doc.extra.get("loss_events")
if doc_events:
loss_persist.append(
(first_chunk_id_for_doc, art.document_root, list(doc_events))
)
for layer_idx in range(1, len(art.tree.layers)):
for node_idx, h in enumerate(art.tree.layers[layer_idx]):
merkle_rows.append(
(art.document_root, layer_idx, node_idx, h.hex())
)
if prior_root is not None:
edge_rows.append(
(art.document_root, prior_root, doc.uri, "supersedes", "")
)
edges_to_upsert.append((art.document_root, doc))
inserted_this_batch.add(art.document_root)
prior_for_doc[doc.uri] = art.document_root
audit_events.append(
{
"event_type": "ingest",
"subject_root": art.document_root,
"ts": ingest_ts,
"body": {
"document_uri": doc.uri,
"source_type": doc.source_type,
"chunks": len(art.chunk_strs),
"chunking_version": chunker_name,
"canonicalization_version": CANONICALIZATION_VERSION,
"schema_version": SCHEMA_VERSION,
"supersedes": prior_root,
},
}
)
inserted += 1
# 3) One executemany per table — minimal Python<->C boundary crossings.
if documents_rows:
conn.executemany(
"INSERT INTO documents "
"(document_root, document_uri, source_type, kind, compression_depth, "
" title, chunking_version, canonicalization_version, schema_version, "
" ingest_ts) "
"VALUES (?, ?, ?, 'surface', 0, ?, ?, ?, ?, ?)",
documents_rows,
)
if chunk_rows:
conn.executemany(
"INSERT INTO chunks (chunk_id, document_root, idx, leaf_hash, content) "
"VALUES (?, ?, ?, ?, ?)",
chunk_rows,
)
conn.executemany(
"INSERT INTO chunks_fts (rowid, content) VALUES (?, ?)",
fts_rows,
)
if merkle_rows:
conn.executemany(
"INSERT INTO merkle_nodes (document_root, layer, idx, hash) "
"VALUES (?, ?, ?, ?)",
merkle_rows,
)
# 3.5) Adapter LossReport sidecar (ticket #000022). Persisted
# under the same transaction as chunks so an auditor never sees
# a chunk_id without its loss rows. Sidecar — does NOT enter
# audit_events / cache_key / document_root.
if loss_persist:
from arborist.sources.loss_report import record_losses
for ck, drt, evs in loss_persist:
record_losses(
conn,
chunk_id=ck,
document_root=drt,
events=evs,
loss_report_policy_hash=loss_policy_hash,
ts=ingest_ts,
)
# 4) Edges: collect all wikilinks across the batch and resolve in
# one SELECT. Then one executemany.
_flush_edges(conn, edges_to_upsert)
if edge_rows:
conn.executemany(
"INSERT OR IGNORE INTO edges "
"(src_root, dst_root, dst_uri, edge_type, anchor) "
"VALUES (?, ?, ?, ?, ?)",
edge_rows,
)
# 5) Audit chain — hashes computed in Python, inserted in one call.
if audit_events:
prev = latest_event_hash(conn)
audit_rows, _ = chain_audit_events(prev, audit_events)
conn.executemany(
"INSERT INTO audit_events "
"(event_hash, prev_event_hash, event_type, subject_root, body, ts) "
"VALUES (?, ?, ?, ?, ?, ?)",
audit_rows,
)
# Capital ledger (ticket #000020). One row per batch attached
# to the last event_hash. Sibling table — does NOT enter
# audit_events.event_hash preimage.
from arborist.capital import profile_for_op, record as capital_record
total_chunks = sum(
ev["body"].get("chunks", 0) for ev in audit_events
)
# Heuristic: ~512 tokens × ~4 chars/token per chunk.
approx_bytes = total_chunks * 512 * 4
profile, inputs = profile_for_op(
"ingest",
{
"doc_count": len(audit_events),
"total_bytes": approx_bytes,
},
)
last_event_hash = audit_rows[-1][0]
capital_record(
conn,
audit_event_hash=last_event_hash,
op_type="ingest",
profile=profile,
estimator_inputs=inputs,
ts=ingest_ts,
)
return inserted, skipped
def _select_existing_roots(
conn: sqlite3.Connection, roots: list[str]
) -> set[str]:
"""Single SELECT to find which document_roots already exist."""
if not roots:
return set()
# SQLite has a default 999-param limit; chunk just in case.
found: set[str] = set()
for i in range(0, len(roots), 500):
slab = roots[i : i + 500]
placeholders = ",".join("?" * len(slab))
for row in conn.execute(
f"SELECT document_root FROM documents WHERE document_root IN ({placeholders})",
slab,
):
found.add(row["document_root"])
return found
def _flush_edges(
conn: sqlite3.Connection,
edges_to_upsert: list[tuple[str, Document]],
) -> None:
"""Upsert all wikilink/hyperlink edges across the batch.
Resolves dst_root for previously-unresolved URIs using a single batched
SELECT (URI -> document_root) instead of N per-edge SELECTs.
"""
if not edges_to_upsert:
return
# Collect distinct dst_uris across the batch for one resolution lookup.
distinct_uris: set[str] = set()
for _, doc in edges_to_upsert:
for e in doc.edges:
if e.dst_uri:
distinct_uris.add(e.dst_uri)
uri_to_root: dict[str, str] = {}
if distinct_uris:
uris_list = list(distinct_uris)
for i in range(0, len(uris_list), 500):
slab = uris_list[i : i + 500]
placeholders = ",".join("?" * len(slab))
for row in conn.execute(
"SELECT document_uri, document_root FROM documents "
f"WHERE document_uri IN ({placeholders})",
slab,
):
# Earliest ingest wins (matches prior LIMIT 1 ASC behavior).
uri_to_root.setdefault(row["document_uri"], row["document_root"])
rows: list[tuple] = []
for src_root, doc in edges_to_upsert:
for e in doc.edges:
dst_root = e.dst_root or uri_to_root.get(e.dst_uri or "", "")
rows.append(
(
src_root,
dst_root,
e.dst_uri or "",
e.edge_type,
e.anchor or "",
)
)
if rows:
conn.executemany(
"INSERT OR IGNORE INTO edges "
"(src_root, dst_root, dst_uri, edge_type, anchor) VALUES (?, ?, ?, ?, ?)",
rows,
)
[docs]
def verify_random_sample(conn: sqlite3.Connection, n: int = 10) -> dict:
"""Sample N documents, regenerate Merkle proof for chunk 0, verify."""
from arborist.merkle import hash_leaf, verify_proof
rows = conn.execute(
"SELECT document_root FROM documents ORDER BY RANDOM() LIMIT ?", (n,)
).fetchall()
if not rows:
return {"sampled": 0, "passed": 0, "failed": 0}
passed = 0
failed = 0
for row in rows:
document_root = row["document_root"]
chunk_rows = conn.execute(
"SELECT idx, leaf_hash, content FROM chunks "
"WHERE document_root = ? ORDER BY idx ASC",
(document_root,),
).fetchall()
if not chunk_rows:
failed += 1
continue
leaves = [bytes.fromhex(r["leaf_hash"]) for r in chunk_rows]
c0 = chunk_rows[0]
c0_content = unpack_chunk(c0["content"])
if c0_content is not None:
recomputed_leaf = hash_leaf(c0_content.encode("utf-8"))
if recomputed_leaf != leaves[0]:
failed += 1
continue
tree = MerkleTree.build(leaves)
if tree.root.hex() != document_root:
failed += 1
continue
proof = tree.proof(0)
if verify_proof(proof) and proof.root.hex() == document_root:
passed += 1
else:
failed += 1
return {"sampled": len(rows), "passed": passed, "failed": failed}