Source code for arborist.distill.tfidf

"""TF-IDF top-K keyword distiller.

Pure-Python, stdlib-only. Uses chunk-level "documents" as the corpus
baseline so a term that appears in every chunk of the same source gets
penalized relative to a term concentrated in fewer chunks.

Output core content is a deterministic comma-separated keyword list —
extreme compression toward the "tweet/haiku" end of the planet metaphor.
"""

from __future__ import annotations

import math
import re
from collections import Counter

from arborist.distill.base import DistillationResult, Distiller
from arborist.document import Document


_TOKEN_RE = re.compile(r"\b[a-zA-Z][a-zA-Z\-']{2,}\b")
_STOPWORDS = frozenset(
    """
    the and or in of to is are was were for with on as by an be this that
    these those from at but not have has had been they their his her its
    our we you he she it all any if no more than such also can may will
    would should could do does did between which where when what who how
    some many most into through during above below after before since
    while well much even only still about other only own same so very
    just over under
    """.split()
)


def _tokenize(text: str) -> list[str]:
    return [t.lower() for t in _TOKEN_RE.findall(text) if t.lower() not in _STOPWORDS]


[docs] class TfidfKeywordDistiller(Distiller): name = "tfidf-keywords-v1" def __init__(self, top_k: int = 16): self.top_k = top_k
[docs] def distill( self, source: Document, source_chunks: list[str] ) -> DistillationResult: if not source_chunks: return self._empty_result(source) chunk_tokens = [_tokenize(c) for c in source_chunks] n = len(chunk_tokens) df: Counter[str] = Counter() for toks in chunk_tokens: df.update(set(toks)) tf: Counter[str] = Counter() for toks in chunk_tokens: tf.update(toks) scores: dict[str, float] = {} for t, f in tf.items(): idf = math.log((n + 1) / (df[t] + 1)) + 1.0 scores[t] = f * idf ranked = sorted(scores.items(), key=lambda kv: (-kv[1], kv[0])) keywords = [t for t, _ in ranked[: self.top_k]] kw_set = set(keywords) contributing: set[int] = set() for i, toks in enumerate(chunk_tokens): if any(t in kw_set for t in toks): contributing.add(i) core_text = ", ".join(keywords) return DistillationResult( core=Document( uri=f"{source.uri}#core/{self.name}", content=core_text, source_type=f"core:{self.name}", title=(source.title or "") + " [KEYWORDS]", ), contributing_chunk_indices=sorted(contributing), )
def _empty_result(self, source: Document) -> DistillationResult: return DistillationResult( core=Document( uri=f"{source.uri}#core/{self.name}", content="", source_type=f"core:{self.name}", title=(source.title or "") + " [KEYWORDS]", ), contributing_chunk_indices=[], )