Source code for arborist.document
"""Document and Chunk dataclasses + canonical chunkers.
Every Document carries a URI (identification, backtrack, cross-link) and content.
Chunkers split content into byte-determined chunks before Merkle hashing. The
chunker's name is committed in chunking_version — changing chunker invalidates
all prior cache records under v9.8 admissibility.
"""
from __future__ import annotations
import re
import unicodedata
from dataclasses import dataclass, field
from typing import Protocol
[docs]
@dataclass(frozen=True)
class Edge:
"""A cross-link from this document to another."""
edge_type: str # wikilink | citation | derived_from | ...
dst_uri: str # always present
dst_root: str | None = None # filled in later if/when target is ingested
anchor: str | None = None # optional fragment / chunk index
[docs]
@dataclass
class Document:
"""An ingestable document: URI + content + outbound edges."""
uri: str
content: str # normalized text
source_type: str # wikipedia_xml | html | git | ...
title: str | None = None
edges: list[Edge] = field(default_factory=list)
extra: dict = field(default_factory=dict) # source-specific metadata
[docs]
def canonicalize(text: str) -> str:
"""Stable text normalization. Bumping this requires CANONICALIZATION_VERSION bump."""
# NFC unicode, normalize whitespace runs to single spaces, strip ends.
text = unicodedata.normalize("NFC", text)
text = re.sub(r"[\r\n\t\f\v]+", "\n", text)
text = re.sub(r"[ ]{2,}", " ", text)
return text.strip()
[docs]
class Chunker(Protocol):
"""A chunker splits canonicalized text into ordered chunks."""
name: str
[docs]
def split(self, text: str) -> list[str]: ...
[docs]
class TokenChunker:
"""512-token chunker (whitespace-tokenized, byte-deterministic).
"Token" here means whitespace-separated unit, NOT a model BPE token. This
avoids tokenizer-version drift in the chunking_version.
"""
name = "tok-512-v1"
def __init__(self, tokens_per_chunk: int = 512):
self.tokens_per_chunk = tokens_per_chunk
[docs]
def split(self, text: str) -> list[str]:
if not text:
return []
tokens = text.split()
if not tokens:
return []
chunks: list[str] = []
for start in range(0, len(tokens), self.tokens_per_chunk):
chunks.append(" ".join(tokens[start : start + self.tokens_per_chunk]))
return chunks
[docs]
class SentenceChunker:
"""Sentence-aligned chunker (better for short docs like 2003 Wikipedia)."""
name = "sent-v1"
_split_re = re.compile(r"(?<=[.!?])\s+(?=[A-Z0-9])")
[docs]
def split(self, text: str) -> list[str]:
if not text:
return []
# Naive but deterministic.
sentences = [s.strip() for s in self._split_re.split(text) if s.strip()]
return sentences or ([text] if text else [])
[docs]
def get_chunker(name: str | None = None) -> Chunker:
"""Lookup chunker by name. Default = TokenChunker."""
if name is None or name == TokenChunker.name:
return TokenChunker()
if name == SentenceChunker.name:
return SentenceChunker()
raise ValueError(f"unknown chunker: {name}")