Source code for arborist.sources.html_page

"""HTML page source.

Fetches URLs, honors robots.txt automatically, strips noise (script/style/nav/
footer/header), extracts main body text + outbound `<a href>` links as edges.

Optional dependency. Install with `pip install arborist[html]`.
"""

from __future__ import annotations

import re
import urllib.parse
import urllib.robotparser
from pathlib import Path
from typing import TYPE_CHECKING, Iterable, Iterator

try:
    import httpx
    from selectolax.parser import HTMLParser
except ImportError as e:  # pragma: no cover
    raise ImportError(
        "HTML source requires extras: pip install 'arborist[html]'"
    ) from e

from arborist.document import Document, Edge
from arborist.source import Source

if TYPE_CHECKING:
    from arborist.sources.loss_report import LossCollector


USER_AGENT = "arborist/0.0.1 (+https://unturf.com)"
NOISE_SELECTORS = ("script", "style", "noscript", "nav", "header", "footer", "aside")
NORMALIZE_VERSION = "html-normalize-v1"
ADAPTER_NAME = "HtmlPageSource"

# Loss-kind taxonomy per ticket #000022 §2.1.1. Free-string in v0; promoted
# to enum after >=3 adapters. ``html_chrome`` is heuristic — selectolax may
# leave residual nav text on pages that don't tag with semantic elements;
# auditors should treat the kind as advisory, not authoritative.
_NOISE_LOSS_KINDS = {
    "script": "script_block",
    "style": "style_block",
    "noscript": "html_chrome",
    "nav": "html_chrome",
    "header": "html_chrome",
    "footer": "html_chrome",
    "aside": "html_chrome",
}


def _normalize_text(
    text: str,
    *,
    loss_collector: "LossCollector | None" = None,
) -> str:
    pre_len = (
        len(text.encode("utf-8", errors="surrogatepass"))
        if loss_collector is not None
        else 0
    )
    text = re.sub(r"[ \t]+", " ", text)
    text = re.sub(r"\n{3,}", "\n\n", text)
    text = text.strip()
    if loss_collector is not None:
        post_len = len(text.encode("utf-8", errors="surrogatepass"))
        loss_collector.record_delta(
            stage="html_normalize",
            canonicalization_version=NORMALIZE_VERSION,
            loss_kind="whitespace_run",
            loss_mode="normalize",
            bytes_delta=pre_len - post_len,
        )
    return text


[docs] def parse_html( url: str, html: str, source_type: str = "html", *, loss_collector: "LossCollector | None" = None, ) -> Document | None: """Pure parse function. Separated so tests can run without network. When ``loss_collector`` is provided, drops from noise-selector decomposition (``<script>``, ``<style>``, ``<nav>``, etc.) and whitespace normalization are recorded against the collector. Output bytes remain bit-identical to the no-collector path. Default ``None`` keeps callers stable. """ tree = HTMLParser(html) for sel in NOISE_SELECTORS: for node in tree.css(sel): if loss_collector is not None: kind = _NOISE_LOSS_KINDS.get(sel, "html_chrome") dropped_html = node.html or "" if dropped_html: loss_collector.add( stage="html_normalize", canonicalization_version=NORMALIZE_VERSION, loss_kind=kind, loss_mode="pure_drop", dropped=dropped_html, ) node.decompose() body = tree.css_first("body") or tree.root if body is None: return None text = _normalize_text( body.text(separator="\n", strip=True), loss_collector=loss_collector, ) if not text: return None title_node = tree.css_first("title") title = title_node.text(strip=True) if title_node is not None else None edges: list[Edge] = [] seen: set[tuple[str, str]] = set() for a in tree.css("a[href]"): href = (a.attributes.get("href") or "").strip() if not href or href.startswith(("javascript:", "mailto:", "tel:", "#")): continue absolute = urllib.parse.urljoin(url, href) split = urllib.parse.urlsplit(absolute) if split.scheme not in ("http", "https"): continue anchor = split.fragment or "" dst_uri = urllib.parse.urlunsplit(split._replace(fragment="")) key = (dst_uri, anchor) if key in seen: continue seen.add(key) edges.append(Edge(edge_type="hyperlink", dst_uri=dst_uri, anchor=anchor or None)) extra: dict = {} if loss_collector is not None: events = loss_collector.events() if events: # Document-scope losses anchor to the doc's first chunk at # ingest time. We stash them in extra; ingest reads and # persists. See ticket #000022 §3.4. extra["loss_events"] = events return Document( uri=url, content=text, source_type=source_type, title=title, edges=edges, extra=extra, )
[docs] class HtmlPageSource(Source): """Iterates a list of URLs, fetching and parsing each as HTML.""" source_type = "html" def __init__( self, urls: Iterable[str], *, respect_robots: bool = True, timeout: float = 30.0, loss_report_enabled: bool = True, loss_report_excerpts: bool = True, loss_report_max_excerpt_bytes: int = 200, default_author: str | None = None, ): """``default_author`` (Phase 1 follow-up of #000031) — when the ingested HTML's ``<title>`` doesn't already include the author surname, this string is appended (``"<title>, by <author>"``). Surfaces the author signal into the warrant resolver's ``_shard_matches_citation`` haystack without a per-shard SQL UPDATE workaround. Wikisource and Project Gutenberg HTML pages rarely include author in ``<title>`` — the manifest entry carries it instead. Skipped when None or empty. """ self.urls = list(urls) self.respect_robots = respect_robots self.timeout = timeout self.loss_report_enabled = loss_report_enabled self.loss_report_excerpts = loss_report_excerpts self.loss_report_max_excerpt_bytes = loss_report_max_excerpt_bytes self.default_author = (default_author or "").strip() self._robots_cache: dict[str, urllib.robotparser.RobotFileParser] = {}
[docs] @classmethod def from_file(cls, path: str | Path, **kwargs) -> HtmlPageSource: urls = [ line.strip() for line in Path(path).read_text(encoding="utf-8").splitlines() if line.strip() and not line.lstrip().startswith("#") ] return cls(urls, **kwargs)
[docs] def iter_documents(self) -> Iterator[Document]: with httpx.Client( headers={"User-Agent": USER_AGENT}, timeout=self.timeout, follow_redirects=True, ) as client: for url in self.urls: if self.respect_robots and not self._allowed(client, url): continue try: resp = client.get(url) resp.raise_for_status() except httpx.HTTPError: continue ctype = resp.headers.get("content-type", "").lower() if "html" not in ctype and "xml" not in ctype: continue collector = None if self.loss_report_enabled: from arborist.sources.loss_report import LossCollector collector = LossCollector( excerpts_enabled=self.loss_report_excerpts, max_excerpt_bytes=self.loss_report_max_excerpt_bytes, adapter_name=ADAPTER_NAME, adapter_version=NORMALIZE_VERSION, ) doc = parse_html( str(resp.url), resp.text, self.source_type, loss_collector=collector, ) if doc is not None: yield self._with_author_appended(doc)
def _allowed(self, client: "httpx.Client", url: str) -> bool: parsed = urllib.parse.urlparse(url) origin = f"{parsed.scheme}://{parsed.netloc}" rp = self._robots_cache.get(origin) if rp is None: rp = urllib.robotparser.RobotFileParser() try: resp = client.get(f"{origin}/robots.txt") except httpx.HTTPError: resp = None if resp is not None and resp.status_code == 200: rp.parse(resp.text.splitlines()) else: # Missing robots.txt = no rules per RFC 9309. rp.allow_all = True self._robots_cache[origin] = rp return rp.can_fetch(USER_AGENT, url) def _with_author_appended(self, doc): """Phase 1 follow-up of #000031 — when the source ingest produces a Document whose title doesn't carry the author surname, append ``", by <default_author>"`` so the resolver's author-surname haystack check fires. Suppress the append when ``default_author`` is empty OR when the existing title already contains a surname token of length >= 4 from the configured author (case-insensitive substring). Idempotent — re-ingesting the same URL with the same config produces the same Document content_root because content doesn't change, only the title text appended. """ if not self.default_author or not doc: return doc title = (doc.title or "").strip() # If any surname-shaped token from default_author already # appears in title (case-insensitive), don't append. title_l = title.lower() for tok in self.default_author.split(): tok_l = tok.lower() if len(tok_l) >= 4 and tok_l in title_l: return doc new_title = ( f"{title}, by {self.default_author}" if title else f"by {self.default_author}" ) # Document is a frozen-ish dataclass; replace title. from dataclasses import replace return replace(doc, title=new_title)