Source code for arborist.document

"""Document and Chunk dataclasses + canonical chunkers.

Every Document carries a URI (identification, backtrack, cross-link) and content.
Chunkers split content into byte-determined chunks before Merkle hashing. The
chunker's name is committed in chunking_version — changing chunker invalidates
all prior cache records under v9.8 admissibility.
"""

from __future__ import annotations

import re
import unicodedata
from dataclasses import dataclass, field
from typing import Protocol


[docs] @dataclass(frozen=True) class Edge: """A cross-link from this document to another.""" edge_type: str # wikilink | citation | derived_from | ... dst_uri: str # always present dst_root: str | None = None # filled in later if/when target is ingested anchor: str | None = None # optional fragment / chunk index
[docs] @dataclass class Document: """An ingestable document: URI + content + outbound edges.""" uri: str content: str # normalized text source_type: str # wikipedia_xml | html | git | ... title: str | None = None edges: list[Edge] = field(default_factory=list) extra: dict = field(default_factory=dict) # source-specific metadata
[docs] def canonicalize(text: str) -> str: """Stable text normalization. Bumping this requires CANONICALIZATION_VERSION bump.""" # NFC unicode, normalize whitespace runs to single spaces, strip ends. text = unicodedata.normalize("NFC", text) text = re.sub(r"[\r\n\t\f\v]+", "\n", text) text = re.sub(r"[ ]{2,}", " ", text) return text.strip()
[docs] class Chunker(Protocol): """A chunker splits canonicalized text into ordered chunks.""" name: str
[docs] def split(self, text: str) -> list[str]: ...
[docs] class TokenChunker: """512-token chunker (whitespace-tokenized, byte-deterministic). "Token" here means whitespace-separated unit, NOT a model BPE token. This avoids tokenizer-version drift in the chunking_version. """ name = "tok-512-v1" def __init__(self, tokens_per_chunk: int = 512): self.tokens_per_chunk = tokens_per_chunk
[docs] def split(self, text: str) -> list[str]: if not text: return [] tokens = text.split() if not tokens: return [] chunks: list[str] = [] for start in range(0, len(tokens), self.tokens_per_chunk): chunks.append(" ".join(tokens[start : start + self.tokens_per_chunk])) return chunks
[docs] class SentenceChunker: """Sentence-aligned chunker (better for short docs like 2003 Wikipedia).""" name = "sent-v1" _split_re = re.compile(r"(?<=[.!?])\s+(?=[A-Z0-9])")
[docs] def split(self, text: str) -> list[str]: if not text: return [] # Naive but deterministic. sentences = [s.strip() for s in self._split_re.split(text) if s.strip()] return sentences or ([text] if text else [])
[docs] def get_chunker(name: str | None = None) -> Chunker: """Lookup chunker by name. Default = TokenChunker.""" if name is None or name == TokenChunker.name: return TokenChunker() if name == SentenceChunker.name: return SentenceChunker() raise ValueError(f"unknown chunker: {name}")